如何显示流式 DataFrame(因为显示失败并出现 AnalysisException)?
How to display a streaming DataFrame (as show fails with AnalysisException)?
所以我在 Kafka 主题中有一些流数据,我正在获取这些流数据并将其放入 DataFrame
。我想显示 DataFrame 里面的数据:
import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'
topic_name = "my-topic"
kafka_broker = "localhost:9092"
producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)
while datetime.now() < terminate:
producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
time.sleep(1)
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("subscribe", topic_name) \
.load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
readDF.writeStream.format("console").start()
readDF.show()
producer.close()
但是我一直收到这个错误:
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
File "test2.py", line 30, in <module>
readDF.show()
File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
print(self._jdf.showString(n, 20))
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
我不明白为什么会发生异常,我在 show()
之前调用 writeStream.start()
。我尝试摆脱 selectExpr()
但这没有任何区别。有谁知道如何显示流源数据帧?我正在使用 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0
Streaming DataFrame 不支持 show()
方法。当你调用 start()
方法时,它会启动一个后台线程将输入数据流式传输到接收器,并且由于你使用的是 ConsoleSink,它会将数据输出到控制台。你不需要打电话给 show()
.
去掉readDF.show()
然后加一个sleep,然后你应该可以在控制台看到数据,比如
query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
您还需要将 startingOffsets
设置为 earliest
,否则,Kafka 源将只从最新的偏移量开始,在您的情况下不会获取任何内容。
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic_name) \
.load()
Streaming DataFrame 不直接支持 show() 方法,但是有一种方法可以让你的后台线程休眠一段时间并在 temp 上使用 show() 函数来查看你的数据table 在内存接收器中创建。我可以帮助使用 show() 方法的 pyspark 方式。
参考我的回答here
所以我在 Kafka 主题中有一些流数据,我正在获取这些流数据并将其放入 DataFrame
。我想显示 DataFrame 里面的数据:
import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'
topic_name = "my-topic"
kafka_broker = "localhost:9092"
producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)
while datetime.now() < terminate:
producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
time.sleep(1)
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("subscribe", topic_name) \
.load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
readDF.writeStream.format("console").start()
readDF.show()
producer.close()
但是我一直收到这个错误:
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
File "test2.py", line 30, in <module>
readDF.show()
File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
print(self._jdf.showString(n, 20))
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
我不明白为什么会发生异常,我在 show()
之前调用 writeStream.start()
。我尝试摆脱 selectExpr()
但这没有任何区别。有谁知道如何显示流源数据帧?我正在使用 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0
Streaming DataFrame 不支持 show()
方法。当你调用 start()
方法时,它会启动一个后台线程将输入数据流式传输到接收器,并且由于你使用的是 ConsoleSink,它会将数据输出到控制台。你不需要打电话给 show()
.
去掉readDF.show()
然后加一个sleep,然后你应该可以在控制台看到数据,比如
query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
您还需要将 startingOffsets
设置为 earliest
,否则,Kafka 源将只从最新的偏移量开始,在您的情况下不会获取任何内容。
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic_name) \
.load()
Streaming DataFrame 不直接支持 show() 方法,但是有一种方法可以让你的后台线程休眠一段时间并在 temp 上使用 show() 函数来查看你的数据table 在内存接收器中创建。我可以帮助使用 show() 方法的 pyspark 方式。
参考我的回答here