旋转流式数据框 pyspark
Pivot a streaming dataframe pyspark
我有一个来自 kafka 的流数据帧,我需要旋转两列。这是我目前使用的代码:
streaming_df = streaming_df.groupBy('Id','Date')\
.pivot('Var')\
.agg(first('Val'))
query = streaming_df.limit(5) \
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("stream") \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
`
我收到以下错误:
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
pyspark 版本:3.1.1
关于如何使用流式数据帧实现枢轴的任何想法?
应用于流数据时,Spark 不支持 pivot
转换。
你可以做的是将 foreachBatch
与用户定义的函数一起使用,如下所示:
def apply_pivot(stream_df, batch_id):
# Here your pivot transformation
stream_df \
.groupBy('Id','Date') \
.pivot('Var') \
.agg(first('Val')) \
.write \
.format('memory') \
.outputMode('append') \
.queryName("stream")
query = streaming_df.limit(5) \
.writeStream \
.foreachBatch(apply_pivot) \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
如果对您有帮助,请告诉我!
我有一个来自 kafka 的流数据帧,我需要旋转两列。这是我目前使用的代码:
streaming_df = streaming_df.groupBy('Id','Date')\
.pivot('Var')\
.agg(first('Val'))
query = streaming_df.limit(5) \
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("stream") \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
`
我收到以下错误:
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
pyspark 版本:3.1.1
关于如何使用流式数据帧实现枢轴的任何想法?
应用于流数据时,Spark 不支持 pivot
转换。
你可以做的是将 foreachBatch
与用户定义的函数一起使用,如下所示:
def apply_pivot(stream_df, batch_id):
# Here your pivot transformation
stream_df \
.groupBy('Id','Date') \
.pivot('Var') \
.agg(first('Val')) \
.write \
.format('memory') \
.outputMode('append') \
.queryName("stream")
query = streaming_df.limit(5) \
.writeStream \
.foreachBatch(apply_pivot) \
.start()
time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()
如果对您有帮助,请告诉我!