pyspark:使用 spark-submit 发送 jar 依赖
pyspark: ship jar dependency with spark-submit
我写了一个 pyspark 脚本,它读取两个 json 文件,coGroup
它们并将结果发送到 elasticsearch 集群;当我在本地 运行 时,一切(大部分)都按预期工作,我为 org.elasticsearch.hadoop.mr.EsOutputFormat
和 org.elasticsearch.hadoop.mr.LinkedMapWritable
类 下载了 elasticsearch-hadoop
jar 文件,然后 运行 我使用 --jars
参数在 pyspark 上工作,我可以看到文档出现在我的 elasticsearch 集群中。
当我尝试在 spark 集群上 运行 它时,我收到了这个错误:
Traceback (most recent call last):
File "/root/spark/spark_test.py", line 141, in <module>
conf=es_write_conf
File "/root/spark/python/pyspark/rdd.py", line 1302, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.util.Utils$.classForName(Utils.scala:157)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$$anonfun$apply.apply(PythonRDD.scala:611)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$$anonfun$apply.apply(PythonRDD.scala:610)
at scala.Option.map(Option.scala:145)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes.apply(PythonRDD.scala:610)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes.apply(PythonRDD.scala:609)
at scala.Option.flatMap(Option.scala:170)
at org.apache.spark.api.python.PythonRDD$.getKeyValueTypes(PythonRDD.scala:609)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:701)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
这对我来说似乎很清楚:elasticsearch-hadoop
罐子在工人身上不可用;所以问题是:我如何将它与我的应用程序一起发送?我可以将 sc.addPyFile
用于 python 依赖项,但它不适用于 jars,并且使用 spark-submit
的 --jars
参数也无济于事。
--jars
正常工作;问题是我如何 运行 spark-submit
工作;正确的执行方式是:
./bin/spark-submit <options> scriptname
因此--jars
选项必须放在脚本之前:
./bin/spark-submit --jars /path/to/my.jar myscript.py
如果您认为这是将参数传递给脚本本身的唯一方法,那么这很明显,因为脚本名称之后的所有内容都将用作脚本的输入参数:
./bin/spark-submit --jars /path/to/my.jar myscript.py --do-magic=true
我写了一个 pyspark 脚本,它读取两个 json 文件,coGroup
它们并将结果发送到 elasticsearch 集群;当我在本地 运行 时,一切(大部分)都按预期工作,我为 org.elasticsearch.hadoop.mr.EsOutputFormat
和 org.elasticsearch.hadoop.mr.LinkedMapWritable
类 下载了 elasticsearch-hadoop
jar 文件,然后 运行 我使用 --jars
参数在 pyspark 上工作,我可以看到文档出现在我的 elasticsearch 集群中。
当我尝试在 spark 集群上 运行 它时,我收到了这个错误:
Traceback (most recent call last):
File "/root/spark/spark_test.py", line 141, in <module>
conf=es_write_conf
File "/root/spark/python/pyspark/rdd.py", line 1302, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.util.Utils$.classForName(Utils.scala:157)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$$anonfun$apply.apply(PythonRDD.scala:611)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$$anonfun$apply.apply(PythonRDD.scala:610)
at scala.Option.map(Option.scala:145)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes.apply(PythonRDD.scala:610)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes.apply(PythonRDD.scala:609)
at scala.Option.flatMap(Option.scala:170)
at org.apache.spark.api.python.PythonRDD$.getKeyValueTypes(PythonRDD.scala:609)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:701)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
这对我来说似乎很清楚:elasticsearch-hadoop
罐子在工人身上不可用;所以问题是:我如何将它与我的应用程序一起发送?我可以将 sc.addPyFile
用于 python 依赖项,但它不适用于 jars,并且使用 spark-submit
的 --jars
参数也无济于事。
--jars
正常工作;问题是我如何 运行 spark-submit
工作;正确的执行方式是:
./bin/spark-submit <options> scriptname
因此--jars
选项必须放在脚本之前:
./bin/spark-submit --jars /path/to/my.jar myscript.py
如果您认为这是将参数传递给脚本本身的唯一方法,那么这很明显,因为脚本名称之后的所有内容都将用作脚本的输入参数:
./bin/spark-submit --jars /path/to/my.jar myscript.py --do-magic=true