Spark 1.6 kafka streaming on dataproc py4j 错误
Spark 1.6 kafka streaming on dataproc py4j error
我收到以下错误:
Py4JError(u'An error occurred while calling o73.createDirectStreamWithoutMessageHandler. Trace:\npy4j.Py4JException: Method createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.util.HashMap, class java.util.HashSet, class java.util.HashMap]) does not exist\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)\n\tat py4j.Gateway.invoke(Gateway.java:252)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n\n',)
我正在使用 spark-streaming-kafka-assembly_2.10-1.6.0.jar(它存在于我所有节点 + master 的 /usr/lib/hadoop/lib/ 文件夹中)
(编辑)
实际错误是:java.lang.NoSuchMethodError: org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava/lang/String;)Ljava/lang/String;
这是由于错误的 hadoop 版本。因此 spark 应该使用正确的 hadoop 版本编译:
mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package
这将在 external/kafka-assembly/target 文件夹中生成一个 jar。
使用映像版本 1,我已成功 运行 pyspark 流式传输 / kafka example wordcount
在每个示例中,"ad-kafka-inst" 是我的带有 'test' 主题的测试 kafka 实例。
使用没有初始化操作的集群:
$ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test
对完整的 kafka 程序集使用初始化操作:
- 下载/解压缩 spark-1.6。0.tgz
构建方式:
$ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package
- 上传 spark-streaming-kafka-assembly_2.10-1.6.0.jar 到新的 GCS 存储桶(例如 MYBUCKET)。
在同一个 GCS 存储桶中创建以下初始化操作(例如 gs://MYBUCKET/install_spark_kafka.sh):
$ #!/bin/bash
gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/
chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar
用上面的初始化动作启动一个集群:
$ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh
开始流式统计字数:
$ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test
我收到以下错误:
Py4JError(u'An error occurred while calling o73.createDirectStreamWithoutMessageHandler. Trace:\npy4j.Py4JException: Method createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.util.HashMap, class java.util.HashSet, class java.util.HashMap]) does not exist\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)\n\tat py4j.Gateway.invoke(Gateway.java:252)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n\n',)
我正在使用 spark-streaming-kafka-assembly_2.10-1.6.0.jar(它存在于我所有节点 + master 的 /usr/lib/hadoop/lib/ 文件夹中)
(编辑) 实际错误是:java.lang.NoSuchMethodError: org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava/lang/String;)Ljava/lang/String;
这是由于错误的 hadoop 版本。因此 spark 应该使用正确的 hadoop 版本编译:
mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package
这将在 external/kafka-assembly/target 文件夹中生成一个 jar。
使用映像版本 1,我已成功 运行 pyspark 流式传输 / kafka example wordcount
在每个示例中,"ad-kafka-inst" 是我的带有 'test' 主题的测试 kafka 实例。
使用没有初始化操作的集群:
$ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test
对完整的 kafka 程序集使用初始化操作:
- 下载/解压缩 spark-1.6。0.tgz
构建方式:
$ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package
- 上传 spark-streaming-kafka-assembly_2.10-1.6.0.jar 到新的 GCS 存储桶(例如 MYBUCKET)。
在同一个 GCS 存储桶中创建以下初始化操作(例如 gs://MYBUCKET/install_spark_kafka.sh):
$ #!/bin/bash gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/ chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar
用上面的初始化动作启动一个集群:
$ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh
开始流式统计字数:
$ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test