如何使用列中定义的条件连接突触数据湖表

how to join synapse data lake tables with condition defined in column

我无法弄清楚如何进行此联接。我有两个突触数据湖 tables,例如:

EquipmentReading

equipment_id,time_utc,temperature
6,2022-05-20T02:16,70
6,2022-05-20T02:17,80
6,2022-05-20T02:18,90
AlertDefinition

id,condition,value,description
1,>=,90,the temperature is too high
2,<=,70,the temperature is too low

我想加入以创建第三个数据湖 table,例如:

Incident

alert_id,equipment_id,time_utc
2,6,2022-05-20T02:16
1,6,2022-05-20T02:18

连接需要有条件,因为 EquipmentReading table 中的 'temperature' 列应该使用 AlertDefinition 中的 'condition' 和 'value' 字段进行比较table 在事件 table 中创建行。我正在看像 data flow joins 这样的教程,但真的不确定如何开始。

我想我可以像这样在 pyspark 代码中加入:

alert_definition_sdf = spark.sql("SELECT * FROM db1.AlertDefinition ORDER BY id ASC")
equipment_reading_sdf = spark.sql("SELECT * FROM db1.EquipmentReading ORDER BY time_utc ASC")

readings = equipment_reading_sdf.select('equipment_id', 'temperature', 'time_utc').collect()
alert_definitions = alert_definition_sdf.select('id', 'condition', 'value').collect()

incidents = []
for reading in readings:
    for alert_definition in alert_definitions:
        eval_string = str(reading['temperature']) + alert_definition['condition'] + str(alert_definition['value'])
        if (eval(eval_string)):
            alert = {
                "equipment_id": reading['equipment_id'],
                "alert_id": alert_definition['id'],
                "time_utc": reading['time_utc']   
            }
            incidents.append(alert)

incident_table_sdf = spark.createDataFrame(incidents)
incident_table_sdf.write.format('csv').option('header',True).mode('overwrite').saveAsTable("db1.Incident")

但这似乎不太理想,尤其是因为它使用了仅适用于小型数据集的 'collect' 方法。我正试图让它作为数据流连接工作。

在处理大量数据时,使用 collect() 和嵌套循环都不是理想的选择。由于条件运算符的值是 AlertDefinition table 中的一列,因此没有其他选择,只能使用 collect 来获取条件运算符和值。有更好的方法来实现这一点,而不是使用多个 collect() 和嵌套循环。

使用 createOrReplaceTempView() 为两个数据框创建临时视图。使用 collect() 获取 'condition' 和 'value' 列的值。使用 for 循环,创建一个 SQL 查询。使用 spark.sql() 执行此查询以获得所需的结果。尝试使用以下代码。

equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')

list_of_conditions = alert_definition_sdf.select('condition', 'value').collect()
query = "select a.id,r.equipment_id,r.time_utc from reading r, alert a where "
for i in list_of_conditions:
    condition = "(a.condition='"+ i['condition'] +"' and r.temperature"+ i['condition']+str(i['value']) +")"
    query = query + condition +" or "

solution_df = spark.sql(query[:-3])  #slicing to remove an extra 'or'

可能无法使用数据流连接,因为您要应用的条件在 AlertDefiniton table 中以 'column values' 的形式存在(而不是作为列)并且此 [=45] 中的每一行=] 代表每个条件。您需要在提取这些列(值和条件列)的值后构建查询,因为它们是应用条件所必需的,上面指定的方法可能是更好的方法。

输出图像:https://i.stack.imgur.com/4jnli.png

另一种方法:

我们正在使用 collect()AlertDefinition table 中获取 conditionvalue 列的值。但是在这种情况下,无论这个table中有多少条记录,只可能条件列中只能有6个不同的条件运算符值(>=,<=,=,<,>, <>).

因此,我们可以手动编写一个 SQL 查询,包括所有可能的条件运算符,而不考虑 AlertDefinition 中记录的值数。这样就不用collect()了。以下方法可能适用于给定的示例 table 数据(为更大的 table 数据添加所有剩余的 4 个条件运算符)。

equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')

solution_df = spark.sql("""select a.id,r.equipment_id,r.time_utc from reading r, alert a where
    (a.condition = '>=' and r.temperature>=a.value) or
    (a.condition = '<=' and r.temperature<=a.value)
""")