Pyspark StreamingQueryException local using query.awaitTermination() - 本地 netcat 流与 jupyter notebook 上的 Pyspark 应用相结合
Pyspark StreamingQueryException local using query.awaitTermination() - local netcat stream combined with Pyspark app on jupyter notebook
我刚刚在我的 Lubuntu 20.04 LTS 系统上使用 python 3.9x 尝试了 pyspark(版本 3.0.1)流式示例的一段基本代码。
我在 GoogleChrome 中打开了一个新的 jupyter 笔记本,从以下代码开始(尚未抛出错误的部分):
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
#%%
## SCRIPT
# Instantiate the new spark session
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
# Call stream with sending messages to local host with standard port "9999"
# --> loads socket
lines = spark.readStream.format("socket").option("host", "local").option("port", 9999).load()
# Create dataframe
# NOTE on several methods employed here (hover over them for docs)
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Create word counter
wordCounts = words.groupBy("word").count()
# Print result
print(wordCounts.printSchema())
# Create output mode for the stream (hover over functions for docs)
om = wordCounts.writeStream.outputMode("complete")
# Create query for the output-mode
# NOTE on output format: can also be "json" if further processing is needed
query = om.format("console").start()
控制台输出:
[I 18:21:45.684 NotebookApp] Kernel restarted: f55c9433-3ae9-45f0-b34b-a1123e2899b0
[I 18:21:45.719 NotebookApp] Restoring connection for f55c9433-3ae9-45f0-b34b-a1123e2899b0:4417bfd825454f4790078827ccc529df
[I 18:21:45.720 NotebookApp] Replaying 3 buffered messages
[I 18:21:48.797 NotebookApp] Saving file at /Untitled.ipynb
20/12/20 18:21:57 WARN Utils: Your hostname, andylu-Lubuntu-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.98 instead (on interface wlp3s0)
20/12/20 18:21:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/andylu/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/12/20 18:21:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/12/20 18:22:04 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
20/12/20 18:22:07 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0843cc22-4f7c-4b2e-a6ef-3ba5aa16ec08. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/12/20 18:22:08 ERROR MicroBatchExecution: Query [id = c5d1875b-7c4e-4ff2-a922-af871b311812, runId = 79c0726b-4457-4c3d-b81f-04f36bc8eedd] terminated with error
java.net.UnknownHostException: local
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:220)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/java.net.Socket.connect(Socket.java:558)
at java.base/java.net.Socket.<init>(Socket.java:454)
at java.base/java.net.Socket.<init>(Socket.java:231)
at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.initialize(TextSocketMicroBatchStream.scala:71)
at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.planInputPartitions(TextSocketMicroBatchStream.scala:117)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions$lzycompute(MicroBatchScanExec.scala:44)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions(MicroBatchScanExec.scala:44)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:61)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:60)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:91)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan(QueryExecution.scala:107)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:563)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
接下来,执行有问题的行:
query.awaitTermination()
它抛出以下异常:
StreamingQueryException Traceback (most recent call last)
<ipython-input-2-885fef5a9f37> in <module>
----> 1 query.awaitTermination()
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/streaming.py in awaitTermination(self, timeout)
101 return self._jsq.awaitTermination(int(timeout * 1000))
102 else:
--> 103 return self._jsq.awaitTermination()
104
105 @property
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/utils.py in raise_from(e)
StreamingQueryException: local
=== Streaming Query ===
Identifier: [id = c5d1875b-7c4e-4ff2-a922-af871b311812, runId = 79c0726b-4457-4c3d-b81f-04f36bc8eedd]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: local, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@14d00623, TextSocketV2[host: local, port: 9999]
实际上,我应该可以像这样使用这个流(打开另一个终端实例):
nc -lk 9999
在那里,应该可以输入例如“Hello Andreas”并在包含该流的 jupyter notebook 控制台中获取字数统计输出。然而,我找不到解决此错误的方法。
编辑尝试的其他内容:
首先,我把主机名“host”改成了“localhost”,因为这好像是公认的标准术语。
Next,正如@Mazahir Hussain 在下面所建议的,我尝试了以下操作(将 lines
替换为 wordCounts
,因为这是我的目标):
query = wordCounts \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/tmp/dtn2/checkpoint")\
.start()
但是,“附加”模式抛出以下异常:
AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider@167d70f6, socket, org.apache.spark.sql.execution.streaming.sources.TextSocketTable@695149a6, org.apache.spark.sql.util.CaseInsensitiveStringMap@b832311b, [value#0]
所以我将模式从“附加”更改为“完成”以避免该错误。
然而,在执行query.awaitTermination()
时,抛出另一个错误:
StreamingQueryException: Connection refused (Connection refused)
=== Streaming Query ===
Identifier: [id = c5843d59-0f89-4974-a694-9f9ae36cf4fe, runId = 5d43c468-ce55-4f8c-b41a-d60f6e053ade]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: localhost, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@5914e8ff, TextSocketV2[host: localhost, port: 9999]
最终,@Mazahir Hussain 实施 .option("checkpointLocation", "/tmp/dtn2/checkpoint")
的建议并不是解决方案。
在/tmp/创建检查点目录,然后设置路径
例如我创建了目录“dtn2”和“checkpoint”
query = lines \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/tmp/dtn2/checkpoint")\
.start()
query.awaitTermination()
注意: 你必须在代码中添加检查点位置
一开始,在新的 UNIX 终端中将代码作为 jupyter notebook 打开:
jupyter notebook "scriptname.ipynb"
接下来,运行其中包含以下代码:
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
#%%
## SCRIPT
# Instantiate the new spark session
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
# * Call stream with sending messages to local host with standard port "9999"
# --> loads socket
# NOTE on hosthame: use "localhost" being the standard term for working locally on your private computer ("local" apparently doesn't work correctly)
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# Create dataframe
# NOTE on several methods employed here (hover over them for docs)
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Create word counter
# NOTE: store in a new dataframe
wordCounts = words.groupBy("word").count()
# * Print results
print(wordCounts.printSchema())
# NOTE on printing out the head: it doesn't work here, but throws the following AnalysisException:
# "method: AnalysisException: Queries with streaming sources must be executed with writeStream.start();;socket"
# --> this seems to function after initiating an actual streaming query
#print(wordCounts.head(5))
现在,打开一个 UNIX
终端并启动一个 netcat 流连接,如下所示:
nc -lk 9999
在里面输入一些你想直播的词,然后依次回车,比如:
hallo world
blabla
接下来,return到jupyter notebook和运行最后一段代码,以便开始查询:
# * Create query for the stream * #
# NOTE on output format: can also be "json" or "memory" if further processing is needed
# NOTE on options "append" and "complete":
# - complete: doesn't need unique counts, e.g. typing in Hello Andreas and then Hello Rikkert counts Hello twice
# - append: only new elements will be considered
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
现在,juypter notebook 的另一个打开的终端会话应该像这样更新:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 1|
|blabla| 1|
| world| 1|
+------+-----+
您可以键入任何您想要在 jupyter notebook 终端会话中相应地更新批处理字数的内容。
要结束进程,请先在 netcat
终端会话中按 CTRL + C
,然后在 jupyter notebook
会话中按
。
作为结论,只需要知道何时通过 netcat 启动流式传输以及 jupyter notebook
必须通过另一个 UNIX
终端启动以显示交互式批量更新每当在 netcat
终端会话中输入单词时。
PS 在本地主机名上:
当使用“local”而不是“localhost”时,它会抛出以下异常,因此请确保在这种情况下调用您的主机 “localhost”:
StreamingQueryException: local
=== Streaming Query ===
Identifier: [id = d4226889-efd8-4992-86e0-2064e7fd45ae, runId = 7b626410-d764-4b53-a8ad-1850b6f0ddd0]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: local, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@262f8557, TextSocketV2[host: local, port: 9999]
我刚刚在我的 Lubuntu 20.04 LTS 系统上使用 python 3.9x 尝试了 pyspark(版本 3.0.1)流式示例的一段基本代码。
我在 GoogleChrome 中打开了一个新的 jupyter 笔记本,从以下代码开始(尚未抛出错误的部分):
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
#%%
## SCRIPT
# Instantiate the new spark session
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
# Call stream with sending messages to local host with standard port "9999"
# --> loads socket
lines = spark.readStream.format("socket").option("host", "local").option("port", 9999).load()
# Create dataframe
# NOTE on several methods employed here (hover over them for docs)
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Create word counter
wordCounts = words.groupBy("word").count()
# Print result
print(wordCounts.printSchema())
# Create output mode for the stream (hover over functions for docs)
om = wordCounts.writeStream.outputMode("complete")
# Create query for the output-mode
# NOTE on output format: can also be "json" if further processing is needed
query = om.format("console").start()
控制台输出:
[I 18:21:45.684 NotebookApp] Kernel restarted: f55c9433-3ae9-45f0-b34b-a1123e2899b0
[I 18:21:45.719 NotebookApp] Restoring connection for f55c9433-3ae9-45f0-b34b-a1123e2899b0:4417bfd825454f4790078827ccc529df
[I 18:21:45.720 NotebookApp] Replaying 3 buffered messages
[I 18:21:48.797 NotebookApp] Saving file at /Untitled.ipynb
20/12/20 18:21:57 WARN Utils: Your hostname, andylu-Lubuntu-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.98 instead (on interface wlp3s0)
20/12/20 18:21:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/andylu/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/12/20 18:21:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/12/20 18:22:04 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
20/12/20 18:22:07 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0843cc22-4f7c-4b2e-a6ef-3ba5aa16ec08. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
20/12/20 18:22:08 ERROR MicroBatchExecution: Query [id = c5d1875b-7c4e-4ff2-a922-af871b311812, runId = 79c0726b-4457-4c3d-b81f-04f36bc8eedd] terminated with error
java.net.UnknownHostException: local
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:220)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/java.net.Socket.connect(Socket.java:558)
at java.base/java.net.Socket.<init>(Socket.java:454)
at java.base/java.net.Socket.<init>(Socket.java:231)
at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.initialize(TextSocketMicroBatchStream.scala:71)
at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.planInputPartitions(TextSocketMicroBatchStream.scala:117)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions$lzycompute(MicroBatchScanExec.scala:44)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions(MicroBatchScanExec.scala:44)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:61)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:60)
at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:91)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan(QueryExecution.scala:107)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:563)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
接下来,执行有问题的行:
query.awaitTermination()
它抛出以下异常:
StreamingQueryException Traceback (most recent call last)
<ipython-input-2-885fef5a9f37> in <module>
----> 1 query.awaitTermination()
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/streaming.py in awaitTermination(self, timeout)
101 return self._jsq.awaitTermination(int(timeout * 1000))
102 else:
--> 103 return self._jsq.awaitTermination()
104
105 @property
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
~/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pyspark/sql/utils.py in raise_from(e)
StreamingQueryException: local
=== Streaming Query ===
Identifier: [id = c5d1875b-7c4e-4ff2-a922-af871b311812, runId = 79c0726b-4457-4c3d-b81f-04f36bc8eedd]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: local, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@14d00623, TextSocketV2[host: local, port: 9999]
实际上,我应该可以像这样使用这个流(打开另一个终端实例):
nc -lk 9999
在那里,应该可以输入例如“Hello Andreas”并在包含该流的 jupyter notebook 控制台中获取字数统计输出。然而,我找不到解决此错误的方法。
编辑尝试的其他内容:
首先,我把主机名“host”改成了“localhost”,因为这好像是公认的标准术语。
Next,正如@Mazahir Hussain 在下面所建议的,我尝试了以下操作(将 lines
替换为 wordCounts
,因为这是我的目标):
query = wordCounts \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/tmp/dtn2/checkpoint")\
.start()
但是,“附加”模式抛出以下异常:
AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider@167d70f6, socket, org.apache.spark.sql.execution.streaming.sources.TextSocketTable@695149a6, org.apache.spark.sql.util.CaseInsensitiveStringMap@b832311b, [value#0]
所以我将模式从“附加”更改为“完成”以避免该错误。
然而,在执行query.awaitTermination()
时,抛出另一个错误:
StreamingQueryException: Connection refused (Connection refused)
=== Streaming Query ===
Identifier: [id = c5843d59-0f89-4974-a694-9f9ae36cf4fe, runId = 5d43c468-ce55-4f8c-b41a-d60f6e053ade]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: localhost, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@5914e8ff, TextSocketV2[host: localhost, port: 9999]
最终,@Mazahir Hussain 实施 .option("checkpointLocation", "/tmp/dtn2/checkpoint")
的建议并不是解决方案。
在/tmp/创建检查点目录,然后设置路径 例如我创建了目录“dtn2”和“checkpoint”
query = lines \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/tmp/dtn2/checkpoint")\
.start()
query.awaitTermination()
注意: 你必须在代码中添加检查点位置
一开始,在新的 UNIX 终端中将代码作为 jupyter notebook 打开:
jupyter notebook "scriptname.ipynb"
接下来,运行其中包含以下代码:
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
#%%
## SCRIPT
# Instantiate the new spark session
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
# * Call stream with sending messages to local host with standard port "9999"
# --> loads socket
# NOTE on hosthame: use "localhost" being the standard term for working locally on your private computer ("local" apparently doesn't work correctly)
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# Create dataframe
# NOTE on several methods employed here (hover over them for docs)
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# Create word counter
# NOTE: store in a new dataframe
wordCounts = words.groupBy("word").count()
# * Print results
print(wordCounts.printSchema())
# NOTE on printing out the head: it doesn't work here, but throws the following AnalysisException:
# "method: AnalysisException: Queries with streaming sources must be executed with writeStream.start();;socket"
# --> this seems to function after initiating an actual streaming query
#print(wordCounts.head(5))
现在,打开一个 UNIX
终端并启动一个 netcat 流连接,如下所示:
nc -lk 9999
在里面输入一些你想直播的词,然后依次回车,比如:
hallo world
blabla
接下来,return到jupyter notebook和运行最后一段代码,以便开始查询:
# * Create query for the stream * #
# NOTE on output format: can also be "json" or "memory" if further processing is needed
# NOTE on options "append" and "complete":
# - complete: doesn't need unique counts, e.g. typing in Hello Andreas and then Hello Rikkert counts Hello twice
# - append: only new elements will be considered
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
现在,juypter notebook 的另一个打开的终端会话应该像这样更新:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| word|count|
+------+-----+
| hello| 1|
|blabla| 1|
| world| 1|
+------+-----+
您可以键入任何您想要在 jupyter notebook 终端会话中相应地更新批处理字数的内容。
要结束进程,请先在 netcat
终端会话中按 CTRL + C
,然后在 jupyter notebook
会话中按
作为结论,只需要知道何时通过 netcat 启动流式传输以及 jupyter notebook
必须通过另一个 UNIX
终端启动以显示交互式批量更新每当在 netcat
终端会话中输入单词时。
PS 在本地主机名上:
当使用“local”而不是“localhost”时,它会抛出以下异常,因此请确保在这种情况下调用您的主机 “localhost”:
StreamingQueryException: local
=== Streaming Query ===
Identifier: [id = d4226889-efd8-4992-86e0-2064e7fd45ae, runId = 7b626410-d764-4b53-a8ad-1850b6f0ddd0]
Current Committed Offsets: {}
Current Available Offsets: {TextSocketV2[host: local, port: 9999]: -1}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Aggregate [word#3], [word#3, count(1) AS count#7L]
+- Project [word#3]
+- Generate explode(split(value#0, , -1)), false, [word#3]
+- StreamingDataSourceV2Relation [value#0], org.apache.spark.sql.execution.streaming.sources.TextSocketTable$$anon@262f8557, TextSocketV2[host: local, port: 9999]