如何将 Spark 直播流与另一个流在其整个生命周期中收集的所有数据结合起来?
How can I join a spark live stream with all the data collected by another stream during its entire life cycle?
我有两个火花流,第一个是与产品相关的数据:它们对供应商的价格、货币、它们的描述、供应商 ID。这些数据按类别丰富,通过对描述和美元价格的分析进行猜测。然后将它们保存在镶木地板数据集中。
第二个流包含有关这些产品拍卖的数据,然后是它们的销售成本和日期。
鉴于一个产品今天可以到达第一个流并在一年内销售,我如何将第二个流与第一个流的镶木地板数据集中包含的所有历史记录连接起来?
要明确的结果应该是每个价格区间的日均收益...
如果您在 Spark 中利用结构化流,那么您可以将第一个流的 parquet 文件加载到数据帧中。
parquetFileDF = spark.read.parquet("products.parquet")
然后您可以获得第二个流并加入 parquet 文件。
streamingDF = spark.readStream. ...
streamingDF.join(parquetFileDF, "type", "right_join")
甚至您也可以将第一个流加入第二个流。
希望,这对您有所帮助。
我使用 snappydata 的可变 DataFrame 找到了一个可能的解决方案:
https://www.snappydata.io/blog/how-mutable-dataframes-improve-join-performance-spark-sql
报告的示例与 claudio-dalicandro
描述的示例非常相似
我有两个火花流,第一个是与产品相关的数据:它们对供应商的价格、货币、它们的描述、供应商 ID。这些数据按类别丰富,通过对描述和美元价格的分析进行猜测。然后将它们保存在镶木地板数据集中。
第二个流包含有关这些产品拍卖的数据,然后是它们的销售成本和日期。
鉴于一个产品今天可以到达第一个流并在一年内销售,我如何将第二个流与第一个流的镶木地板数据集中包含的所有历史记录连接起来?
要明确的结果应该是每个价格区间的日均收益...
如果您在 Spark 中利用结构化流,那么您可以将第一个流的 parquet 文件加载到数据帧中。
parquetFileDF = spark.read.parquet("products.parquet")
然后您可以获得第二个流并加入 parquet 文件。
streamingDF = spark.readStream. ...
streamingDF.join(parquetFileDF, "type", "right_join")
甚至您也可以将第一个流加入第二个流。
希望,这对您有所帮助。
我使用 snappydata 的可变 DataFrame 找到了一个可能的解决方案:
https://www.snappydata.io/blog/how-mutable-dataframes-improve-join-performance-spark-sql
报告的示例与 claudio-dalicandro
描述的示例非常相似