使用 Python 的 Spark Streaming 1.6.0 EMR:ClassNotFoundException:org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper

Spark Streaming 1.6.0 EMR using Python : ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper

我正在 运行使用 AWS 上的 Spark 1.6.0 和 Zeppelin 0.5.6 创建一个开箱即用的 EMR 集群。我的目标是初始化一个简单的 Spark Streaming 上下文并指向一个内部 Kinesis 流,作为概念验证。但是,当我 运行 它时,我得到:

Py4JJavaError: An error occurred while calling o89.loadClass. : 
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

我 运行ning(通过 Zeppelin)的代码很简单:

%pyspark
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

ssc = StreamingContext(sc, 1)

appName = '{my-app-name}'
streamName = '{my-stream-name}'
endpointUrl = '{my-endpoint}'
regionName = '{my-region}'

lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)

当我 运行 在本地进行此操作时,我确保从源代码构建 spark-streaming-kinesis-asl 并将这些 jar 包含在我的 spark 配置中:

spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/*

但是,我似乎无法在使用 EMR 时使用它。为了安全起见,我将其包含在以下内容中,但无济于事:

spark.driver.extraClassPath
spark.driver.extraLibraryPath
spark.executor.extraClassPath
spark.executor.extraLibraryPath

有没有人运行以前参与过这个?当我重新启动上下文以确认这些更改正在被拾取时,我正在打印出 spark 配置。也许这也需要在从属节点上完成?或者完全是另一个配置 option/key?

将依赖项添加到 zeppelin 上下文 "z"。这是添加 spark csv 包的示例

%dep
z.load("com.databricks:spark-csv_2.11:1.3.0")