PySpark Streaming 示例似乎没有终止
PySpark Streaming example does not seem to terminate
我试图通过一个简单的例子来理解 Spark Streaming 的 Python API。
from pyspark.streaming import StreamingContext
dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]]
dvc = [sc.parallelize(i, 1) for i in dvc]
ssc = StreamingContext(sc, 2.0)
input_stream = ssc.queueStream(dvc)
def get_output(rdd):
print(rdd.collect())
input_stream.foreachRDD(get_output)
ssc.start()
这会打印所需的输出,但最后会打印很多空列表并且不会终止。谁能告诉我我可能哪里出错了。
在大多数情况下(除非您的代码中的条件终止)流应该是无限的。流应用程序的目的是使用定期传入的数据。因此,在处理完前 4 个 RDD (i.e. [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]])
后,队列中没有任何内容,而 Spark Streaming 建立在新事物可能进入 queueStream
的概念之上
如果您正在进行一次性 ETL,您可能会考虑放弃流式处理。
我试图通过一个简单的例子来理解 Spark Streaming 的 Python API。
from pyspark.streaming import StreamingContext
dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]]
dvc = [sc.parallelize(i, 1) for i in dvc]
ssc = StreamingContext(sc, 2.0)
input_stream = ssc.queueStream(dvc)
def get_output(rdd):
print(rdd.collect())
input_stream.foreachRDD(get_output)
ssc.start()
这会打印所需的输出,但最后会打印很多空列表并且不会终止。谁能告诉我我可能哪里出错了。
在大多数情况下(除非您的代码中的条件终止)流应该是无限的。流应用程序的目的是使用定期传入的数据。因此,在处理完前 4 个 RDD (i.e. [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]])
后,队列中没有任何内容,而 Spark Streaming 建立在新事物可能进入 queueStream
如果您正在进行一次性 ETL,您可能会考虑放弃流式处理。