如何在 Pyspark 中计算或管理流数据

How to calculate or manage streaming data in Pyspark

我想从流数据中计算数据,然后发送到网页。 例如:我将计算流数据中 TotalSales 列的总和。 但它在 summary = dataStream.select('TotalSales').groupby().sum().toPandas() 处出错,这是我的代码。

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")
INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"
dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)
query = dataStream.writeStream.format("console").start()

summary = dataStream.select('TotalSales').groupby().sum().toPandas()
print(query.id)
query.awaitTermination();

这是命令行上显示的错误。

Traceback (most recent call last):
  File "testStreaming.py", line 12, in <module>
    dataStream = dataStream.toPandas()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 2150, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[C:/Users/HP/Desktop/test/jsonFile]'

感谢您的回答。

你为什么要创建一个 pandas Df

toPandas 将创建一个本地到您的驱动程序节点的 DataFrame。我不确定你想在这里实现什么。 Pandas DataFrame 表示一组固定的元组,其中 structured stream 是连续的数据流。

现在这个问题的一个可能的解决方案是完成你想要做的整个过程并将输出发送到一个 parquet/csv 文件并使用这个 parquet/csv 等文件来创建一个 pandas东风.

summary = dataStream.select('TotalSales').groupby().sum()
query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)
query.awaitTermination()