如何使用列中定义的条件连接突触数据湖表
how to join synapse data lake tables with condition defined in column
我无法弄清楚如何进行此联接。我有两个突触数据湖 tables,例如:
EquipmentReading
equipment_id,time_utc,temperature
6,2022-05-20T02:16,70
6,2022-05-20T02:17,80
6,2022-05-20T02:18,90
AlertDefinition
id,condition,value,description
1,>=,90,the temperature is too high
2,<=,70,the temperature is too low
我想加入以创建第三个数据湖 table,例如:
Incident
alert_id,equipment_id,time_utc
2,6,2022-05-20T02:16
1,6,2022-05-20T02:18
连接需要有条件,因为 EquipmentReading table 中的 'temperature' 列应该使用 AlertDefinition 中的 'condition' 和 'value' 字段进行比较table 在事件 table 中创建行。我正在看像 data flow joins 这样的教程,但真的不确定如何开始。
我想我可以像这样在 pyspark 代码中加入:
alert_definition_sdf = spark.sql("SELECT * FROM db1.AlertDefinition ORDER BY id ASC")
equipment_reading_sdf = spark.sql("SELECT * FROM db1.EquipmentReading ORDER BY time_utc ASC")
readings = equipment_reading_sdf.select('equipment_id', 'temperature', 'time_utc').collect()
alert_definitions = alert_definition_sdf.select('id', 'condition', 'value').collect()
incidents = []
for reading in readings:
for alert_definition in alert_definitions:
eval_string = str(reading['temperature']) + alert_definition['condition'] + str(alert_definition['value'])
if (eval(eval_string)):
alert = {
"equipment_id": reading['equipment_id'],
"alert_id": alert_definition['id'],
"time_utc": reading['time_utc']
}
incidents.append(alert)
incident_table_sdf = spark.createDataFrame(incidents)
incident_table_sdf.write.format('csv').option('header',True).mode('overwrite').saveAsTable("db1.Incident")
但这似乎不太理想,尤其是因为它使用了仅适用于小型数据集的 'collect' 方法。我正试图让它作为数据流连接工作。
在处理大量数据时,使用 collect()
和嵌套循环都不是理想的选择。由于条件运算符的值是 AlertDefinition
table 中的一列,因此没有其他选择,只能使用 collect 来获取条件运算符和值。有更好的方法来实现这一点,而不是使用多个 collect() 和嵌套循环。
使用 createOrReplaceTempView()
为两个数据框创建临时视图。使用 collect() 获取 'condition' 和 'value' 列的值。使用 for 循环,创建一个 SQL 查询。使用 spark.sql()
执行此查询以获得所需的结果。尝试使用以下代码。
equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')
list_of_conditions = alert_definition_sdf.select('condition', 'value').collect()
query = "select a.id,r.equipment_id,r.time_utc from reading r, alert a where "
for i in list_of_conditions:
condition = "(a.condition='"+ i['condition'] +"' and r.temperature"+ i['condition']+str(i['value']) +")"
query = query + condition +" or "
solution_df = spark.sql(query[:-3]) #slicing to remove an extra 'or'
可能无法使用数据流连接,因为您要应用的条件在 AlertDefiniton
table 中以 'column values' 的形式存在(而不是作为列)并且此 [=45] 中的每一行=] 代表每个条件。您需要在提取这些列(值和条件列)的值后构建查询,因为它们是应用条件所必需的,上面指定的方法可能是更好的方法。
输出图像:https://i.stack.imgur.com/4jnli.png
另一种方法:
我们正在使用 collect()
从 AlertDefinition
table 中获取 condition
和 value
列的值。但是在这种情况下,无论这个table中有多少条记录,只可能条件列中只能有6个不同的条件运算符值(>=,<=,=,<,>, <>).
因此,我们可以手动编写一个 SQL 查询,包括所有可能的条件运算符,而不考虑 AlertDefinition
中记录的值数。这样就不用collect()
了。以下方法可能适用于给定的示例 table 数据(为更大的 table 数据添加所有剩余的 4 个条件运算符)。
equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')
solution_df = spark.sql("""select a.id,r.equipment_id,r.time_utc from reading r, alert a where
(a.condition = '>=' and r.temperature>=a.value) or
(a.condition = '<=' and r.temperature<=a.value)
""")
我无法弄清楚如何进行此联接。我有两个突触数据湖 tables,例如:
EquipmentReading
equipment_id,time_utc,temperature
6,2022-05-20T02:16,70
6,2022-05-20T02:17,80
6,2022-05-20T02:18,90
AlertDefinition
id,condition,value,description
1,>=,90,the temperature is too high
2,<=,70,the temperature is too low
我想加入以创建第三个数据湖 table,例如:
Incident
alert_id,equipment_id,time_utc
2,6,2022-05-20T02:16
1,6,2022-05-20T02:18
连接需要有条件,因为 EquipmentReading table 中的 'temperature' 列应该使用 AlertDefinition 中的 'condition' 和 'value' 字段进行比较table 在事件 table 中创建行。我正在看像 data flow joins 这样的教程,但真的不确定如何开始。
我想我可以像这样在 pyspark 代码中加入:
alert_definition_sdf = spark.sql("SELECT * FROM db1.AlertDefinition ORDER BY id ASC")
equipment_reading_sdf = spark.sql("SELECT * FROM db1.EquipmentReading ORDER BY time_utc ASC")
readings = equipment_reading_sdf.select('equipment_id', 'temperature', 'time_utc').collect()
alert_definitions = alert_definition_sdf.select('id', 'condition', 'value').collect()
incidents = []
for reading in readings:
for alert_definition in alert_definitions:
eval_string = str(reading['temperature']) + alert_definition['condition'] + str(alert_definition['value'])
if (eval(eval_string)):
alert = {
"equipment_id": reading['equipment_id'],
"alert_id": alert_definition['id'],
"time_utc": reading['time_utc']
}
incidents.append(alert)
incident_table_sdf = spark.createDataFrame(incidents)
incident_table_sdf.write.format('csv').option('header',True).mode('overwrite').saveAsTable("db1.Incident")
但这似乎不太理想,尤其是因为它使用了仅适用于小型数据集的 'collect' 方法。我正试图让它作为数据流连接工作。
在处理大量数据时,使用 collect()
和嵌套循环都不是理想的选择。由于条件运算符的值是 AlertDefinition
table 中的一列,因此没有其他选择,只能使用 collect 来获取条件运算符和值。有更好的方法来实现这一点,而不是使用多个 collect() 和嵌套循环。
使用 createOrReplaceTempView()
为两个数据框创建临时视图。使用 collect() 获取 'condition' 和 'value' 列的值。使用 for 循环,创建一个 SQL 查询。使用 spark.sql()
执行此查询以获得所需的结果。尝试使用以下代码。
equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')
list_of_conditions = alert_definition_sdf.select('condition', 'value').collect()
query = "select a.id,r.equipment_id,r.time_utc from reading r, alert a where "
for i in list_of_conditions:
condition = "(a.condition='"+ i['condition'] +"' and r.temperature"+ i['condition']+str(i['value']) +")"
query = query + condition +" or "
solution_df = spark.sql(query[:-3]) #slicing to remove an extra 'or'
可能无法使用数据流连接,因为您要应用的条件在 AlertDefiniton
table 中以 'column values' 的形式存在(而不是作为列)并且此 [=45] 中的每一行=] 代表每个条件。您需要在提取这些列(值和条件列)的值后构建查询,因为它们是应用条件所必需的,上面指定的方法可能是更好的方法。
输出图像:https://i.stack.imgur.com/4jnli.png
另一种方法:
我们正在使用 collect()
从 AlertDefinition
table 中获取 condition
和 value
列的值。但是在这种情况下,无论这个table中有多少条记录,只可能条件列中只能有6个不同的条件运算符值(>=,<=,=,<,>, <>).
因此,我们可以手动编写一个 SQL 查询,包括所有可能的条件运算符,而不考虑 AlertDefinition
中记录的值数。这样就不用collect()
了。以下方法可能适用于给定的示例 table 数据(为更大的 table 数据添加所有剩余的 4 个条件运算符)。
equipment_reading_sdf.createOrReplaceTempView('reading')
alert_definition_sdf.createOrReplaceTempView('alert')
solution_df = spark.sql("""select a.id,r.equipment_id,r.time_utc from reading r, alert a where
(a.condition = '>=' and r.temperature>=a.value) or
(a.condition = '<=' and r.temperature<=a.value)
""")