内部加入流数据帧
Inner Join with streaming dataframes
所以我在 pyspark 中有这个流数据帧 (gps_messages)-
并且我希望生成的数据帧具有相同的(所有)列,但每个 device_unique_id 的记录/行具有最高的时间戳值,所以基本上类似于 -
(MAX)
+----------------+-----------+--------+---------+---------+----------+
|device_unique_id|signal_type|latitude|longitude|elevation| Timestamp|
+----------------+-----------+--------+---------+---------+----------+
| TR1 |loc_update |-35.5484|149.61684|666.47164| 12345 | <-- *NOTE - please check below
| TR2 |loc_update |-35.5484|149.61684|666.47164| 87251 |
| TR3 |loc_update |-35.5484|149.61684|666.47164| 32458 |
| TR4 |loc_update |-35.5484|149.61684|666.47164| 98274 |
+----------------+-----------+--------+---------+---------+----------+
*Note = only 1 record for TR1 from previous dataframe which had max value of timeframe among all records having 'device_unique_id'=='TR1'
到目前为止,我已经写了这段代码,
gps_messages.createOrReplaceTempView('gps_table')
SQL_QUERY = 'SELECT device_unique_id, max(timestamp) as timestamp ' \
'FROM gps_table ' \
'GROUP BY device_unique_id'
# SQL_QUERY1 = 'SELECT * ' \
# 'FROM gps_table t2 ' \
# 'JOIN (SELECT device_unique_id AS unique_id, max(timestamp) AS time ' \
# 'FROM gps_table t1 ' \
# 'GROUP BY unique_id) t1 ' \
# 'ON t2.device_unique_id = t1.unique_id ' \
# 'AND t2.timestamp = t1.time'
filtered_gps_messages = spark.sql(SQL_QUERY)
filtered_gps_messages.createOrReplaceTempView('table_max_ts')
SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
'FROM table_max_ts b, gps_table a ' \
'WHERE b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'
latest_data_df = spark.sql(SQL_QUERY)
query = latest_data_df \
.writeStream \
.outputMode('append') \
.format('console') \
.start()
query.awaitTermination()
它抛出了这个错误 -
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nProject [device_unique_id#25, signal_type#26, latitude#27, longitude#28, elevation#29, timestamp#30, unique_id#43, time#44]\n+- Join Inner, ((device_unique_id#25 = unique_id#43) && (timestamp#30 = time#44))\n :- SubqueryAlias `t2`\n : +- SubqueryAlias `gps_table`\n : +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n : +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n : +- Project [cast(value#8 as string) AS value#21]\n : +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n +- SubqueryAlias `t1`\n +- Aggregate [device_unique_id#25], [device_unique_id#25 AS unique_id#43, max(timestamp#30) AS time#44]\n +- SubqueryAlias `t1`\n +- SubqueryAlias `gps_table`\n +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'
Process finished with exit code 1
如果我尝试使用 "complete" 输出模式,它会显示 -
Analysis Exception: Inner Join between two streaming dataframes/datasets is not supported in Complete mode, only in append mode.
我在这里做错了什么?有没有其他方法或解决方法?
很抱歉问题的类型,我是新手。
谢谢。
在此处查看 => http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries 流模式不支持某些联接。
也许使用左外连接。
并且以追加模式编写应该可以解决问题
SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
'FROM table_max_ts b
LEFT JOIN gps_table a ' \
'ON b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'
EDIT : 需要加水印以确保及时查看权限数据。对于外连接
filtered_gps_messagesW = filtered_gps_messages.withWatermark("timestamp", "2 hours")
gps_messagesW= gps_messages.withWatermark("timestamp", "3 hours")
然后将带水印的 DS 注册为 tmpTables,您应该 ok.Adjust 时间间隔满足您的需要。
所以我在 pyspark 中有这个流数据帧 (gps_messages)-
并且我希望生成的数据帧具有相同的(所有)列,但每个 device_unique_id 的记录/行具有最高的时间戳值,所以基本上类似于 -
(MAX)
+----------------+-----------+--------+---------+---------+----------+
|device_unique_id|signal_type|latitude|longitude|elevation| Timestamp|
+----------------+-----------+--------+---------+---------+----------+
| TR1 |loc_update |-35.5484|149.61684|666.47164| 12345 | <-- *NOTE - please check below
| TR2 |loc_update |-35.5484|149.61684|666.47164| 87251 |
| TR3 |loc_update |-35.5484|149.61684|666.47164| 32458 |
| TR4 |loc_update |-35.5484|149.61684|666.47164| 98274 |
+----------------+-----------+--------+---------+---------+----------+
*Note = only 1 record for TR1 from previous dataframe which had max value of timeframe among all records having 'device_unique_id'=='TR1'
到目前为止,我已经写了这段代码,
gps_messages.createOrReplaceTempView('gps_table')
SQL_QUERY = 'SELECT device_unique_id, max(timestamp) as timestamp ' \
'FROM gps_table ' \
'GROUP BY device_unique_id'
# SQL_QUERY1 = 'SELECT * ' \
# 'FROM gps_table t2 ' \
# 'JOIN (SELECT device_unique_id AS unique_id, max(timestamp) AS time ' \
# 'FROM gps_table t1 ' \
# 'GROUP BY unique_id) t1 ' \
# 'ON t2.device_unique_id = t1.unique_id ' \
# 'AND t2.timestamp = t1.time'
filtered_gps_messages = spark.sql(SQL_QUERY)
filtered_gps_messages.createOrReplaceTempView('table_max_ts')
SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
'FROM table_max_ts b, gps_table a ' \
'WHERE b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'
latest_data_df = spark.sql(SQL_QUERY)
query = latest_data_df \
.writeStream \
.outputMode('append') \
.format('console') \
.start()
query.awaitTermination()
它抛出了这个错误 -
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nProject [device_unique_id#25, signal_type#26, latitude#27, longitude#28, elevation#29, timestamp#30, unique_id#43, time#44]\n+- Join Inner, ((device_unique_id#25 = unique_id#43) && (timestamp#30 = time#44))\n :- SubqueryAlias `t2`\n : +- SubqueryAlias `gps_table`\n : +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n : +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n : +- Project [cast(value#8 as string) AS value#21]\n : +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n +- SubqueryAlias `t1`\n +- Aggregate [device_unique_id#25], [device_unique_id#25 AS unique_id#43, max(timestamp#30) AS time#44]\n +- SubqueryAlias `t1`\n +- SubqueryAlias `gps_table`\n +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'
Process finished with exit code 1
如果我尝试使用 "complete" 输出模式,它会显示 -
Analysis Exception: Inner Join between two streaming dataframes/datasets is not supported in Complete mode, only in append mode.
我在这里做错了什么?有没有其他方法或解决方法? 很抱歉问题的类型,我是新手。 谢谢。
在此处查看 => http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries 流模式不支持某些联接。
也许使用左外连接。
并且以追加模式编写应该可以解决问题
SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
'FROM table_max_ts b
LEFT JOIN gps_table a ' \
'ON b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'
EDIT : 需要加水印以确保及时查看权限数据。对于外连接
filtered_gps_messagesW = filtered_gps_messages.withWatermark("timestamp", "2 hours")
gps_messagesW= gps_messages.withWatermark("timestamp", "3 hours")
然后将带水印的 DS 注册为 tmpTables,您应该 ok.Adjust 时间间隔满足您的需要。