Pyspark error : py4j.protocol.Py4JNetworkError: Answer from Java side is empty
Pyspark error : py4j.protocol.Py4JNetworkError: Answer from Java side is empty
我在 jupyter notebook 上使用 Python 3.6.7 和 Pyspark 2.3.0 以及 spark 2.3.0 从 kafka 中提取推文并使用 spark 流处理它们。在运行下面的代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import logging
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 60)
logging.getLogger("py4j").setLevel(logging.ERROR)
kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'kafkaspark':1})
我收到以下错误:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 908, in send_command
response = connection.send_command(command)
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-7-a7a877501187> in <module>
----> 1 kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'kafkaspark':1})
C:\Sentiment_Analysis\spark\python\pyspark\streaming\kafka.py in createStream(ssc, zkQuorum, groupId, topics, kafkaParams, storageLevel, keyDecoder, valueDecoder)
77 jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
78 helper = KafkaUtils._get_helper(ssc._sc)
---> 79 jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
80 ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
81 stream = DStream(jstream, ssc, ser)
C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JError(
327 "An error occurred while calling {0}{1}{2}".
--> 328 format(target_id, ".", name))
329 else:
330 type = answer[1]
Py4JError: An error occurred while calling o32.createStream
我不知道如何解决这个错误?我是 Spark 和 kafka 的初学者,所以有人可以用简单的语言向我解释如何摆脱这个吗?我该怎么办?
兼容性问题请参考这个link:http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
因此,要解决您的问题,请使用与您的 spark 版本兼容的版本的 kafka 库。
我在 jupyter notebook 上使用 Python 3.6.7 和 Pyspark 2.3.0 以及 spark 2.3.0 从 kafka 中提取推文并使用 spark 流处理它们。在运行下面的代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import logging
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 60)
logging.getLogger("py4j").setLevel(logging.ERROR)
kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'kafkaspark':1})
我收到以下错误:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 908, in send_command
response = connection.send_command(command)
File "C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-7-a7a877501187> in <module>
----> 1 kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'kafkaspark':1})
C:\Sentiment_Analysis\spark\python\pyspark\streaming\kafka.py in createStream(ssc, zkQuorum, groupId, topics, kafkaParams, storageLevel, keyDecoder, valueDecoder)
77 jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
78 helper = KafkaUtils._get_helper(ssc._sc)
---> 79 jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
80 ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
81 stream = DStream(jstream, ssc, ser)
C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\Sentiment_Analysis\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JError(
327 "An error occurred while calling {0}{1}{2}".
--> 328 format(target_id, ".", name))
329 else:
330 type = answer[1]
Py4JError: An error occurred while calling o32.createStream
我不知道如何解决这个错误?我是 Spark 和 kafka 的初学者,所以有人可以用简单的语言向我解释如何摆脱这个吗?我该怎么办?
兼容性问题请参考这个link:http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources 因此,要解决您的问题,请使用与您的 spark 版本兼容的版本的 kafka 库。