当流为 运行 时,Pyspark shell 不可用

Pyspark shell unusable while stream is running

我刚开始在 Windows 10 下使用 pyspark 探索 Apache Spark(在 A gentle introduction to Apache Spark 之后)。我到了关于结构化流的章节,我在使用 cmd 时遇到了一些麻烦 - 每当我开始流时,cmd window 变得不可用,因为 Spark 保持 "typing" 东西所以即使我输入任何东西,它也会很快消失。

我的代码(直接取自书本):

from pyspark.sql.functions import window, column, desc, col

staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./data/retail-data/by-day/*.csv")

staticSchema = staticDataFrame.schema

streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("./data/retail-data/by-day/*.csv")

purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost" ,
"InvoiceDate" )\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

purchaseByCustomerPerHour.writeStream\
.format("memory")\
.option('checkpointLocation','F:/Spark/sparktmp')\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

我说的问题:

插入符号应位于 [Stage 6:======>] 所在的行。因此,如果我想查询流(如书中建议的那样),我无法做到。而且我不能只打开第二个 pyspark shell,因为那将是一个不同的 Spark 会话。我也不确定流式作业是否应该在耗尽所有输入文件时重新开始(确实如此),但我想这是另一个问题的主题。

如果我应该提供更多信息,请告诉我。提前致谢!

TL;DR; 只用notebook环境。 Jupyter Notebook (optionally with Apache Toree kernel) or Apache Zeppelin Notebook, will work just fine and won't capture the output (this might be undesired ),并允许您进行不间断的查询。

在标准 shell 中,将 spark.ui.showConsoleProgress 设置为 false 也可以有所帮助:

bin/pyspark --conf "spark.ui.showConsoleProgress=false"