旋转流式数据框 pyspark

Pivot a streaming dataframe pyspark

我有一个来自 kafka 的流数据帧,我需要旋转两列。这是我目前使用的代码:

streaming_df = streaming_df.groupBy('Id','Date')\
            .pivot('Var')\
            .agg(first('Val'))

query = streaming_df.limit(5) \
            .writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("stream") \
            .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

`

我收到以下错误: pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

pyspark 版本:3.1.1

关于如何使用流式数据帧实现枢轴的任何想法?

应用于流数据时,Spark 不支持 pivot 转换。

你可以做的是将 foreachBatch 与用户定义的函数一起使用,如下所示:

def apply_pivot(stream_df, batch_id):
    # Here your pivot transformation
    stream_df \
        .groupBy('Id','Date') \
        .pivot('Var') \
        .agg(first('Val')) \
        .write \
        .format('memory') \
        .outputMode('append') \
        .queryName("stream")

query = streaming_df.limit(5) \
    .writeStream \
    .foreachBatch(apply_pivot) \
    .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

如果对您有帮助,请告诉我!