AWS EMR - ModuleNotFoundError: No module named 'pyarrow'
AWS EMR - ModuleNotFoundError: No module named 'pyarrow'
我 运行 通过 Apache Arrow Spark Integration 解决了这个问题。
使用带 Spark 2.4.3 的 AWS EMR
在本地 spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。
在 spark-env.sh
中设置这些
export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
在 spark 中确认了这一点 shell
spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
运行 基本 pandas_udf 与 apache 箭头集成导致错误
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
aws emr 上的错误[在 cloudera 和本地机器上没有错误]
ModuleNotFoundError: No module named 'pyarrow'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:291)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:283)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有人知道发生了什么事吗?一些可能的想法...
PYTHONPATH 是否会导致问题,因为我没有使用 anaconda
?
是否与Spark版本和Arrow版本有关?
这是最奇怪的事情,因为我在所有 3 个平台 [本地桌面、cloudera、emr] 中使用相同的版本,只有 EMR 不工作...
我登录了所有 4 个 EMR EC2 数据节点并测试了我可以导入pyarrow
,它工作得很好,但在尝试将它与 spark
[= 一起使用时却不行43=]
# test
import numpy as np
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({'one': [20, np.nan, 2.5],'two': ['january', 'february', 'march'],'three': [True, False, True]},index=list('abc'))
table = pa.Table.from_pandas(df)
您的情况有两种选择:
一个是确保 python env 在每台机器上都是正确的:
将 PYSPARK_PYTHON
设置为安装了第三方模块(例如 pyarrow
)的 python 解释器。您可以使用 type -a python
来检查您的从属节点上有多少 python。
如果python解释器路径在每个节点上都一样,你可以在spark-env.sh
中设置PYSPARK_PYTHON
然后复制到每个其他节点。阅读更多:https://spark.apache.org/docs/2.4.0/spark-standalone.html
另一种选择是在 spark-submit
:
上添加参数
您必须先将额外的模块打包到 zip
或 egg
文件中。
然后输入spark-submit --py-files pyarrow.zip your_code.py
。这样,spark 将自动将您的模块传输到其他每个节点。
https://spark.apache.org/docs/latest/submitting-applications.html
希望这些对您有所帮助。
在 EMR 中 python3 默认不解析。你必须把它说清楚。一种方法是在创建集群时传递 config.json
文件。它位于 AWS EMR UI 的 Edit software settings
部分。示例 json 文件看起来像这样。
[
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
},
{
"Classification": "yarn-env",
"Properties": {},
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
}
]
此外,您还需要在所有核心节点中安装 pyarrow
模块,而不仅仅是在主节点中。为此,您可以在 AWS 中创建集群时使用 bootstrap 脚本。同样,示例 bootstrap 脚本可以像这样简单:
#!/bin/bash
sudo python3 -m pip install pyarrow==0.13.0
我 运行 通过 Apache Arrow Spark Integration 解决了这个问题。
使用带 Spark 2.4.3 的 AWS EMR
在本地 spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。
在 spark-env.sh
中设置这些export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
在 spark 中确认了这一点 shell
spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
运行 基本 pandas_udf 与 apache 箭头集成导致错误
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
aws emr 上的错误[在 cloudera 和本地机器上没有错误]
ModuleNotFoundError: No module named 'pyarrow'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:291)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:283)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
有人知道发生了什么事吗?一些可能的想法...
PYTHONPATH 是否会导致问题,因为我没有使用 anaconda
?
是否与Spark版本和Arrow版本有关?
这是最奇怪的事情,因为我在所有 3 个平台 [本地桌面、cloudera、emr] 中使用相同的版本,只有 EMR 不工作...
我登录了所有 4 个 EMR EC2 数据节点并测试了我可以导入pyarrow
,它工作得很好,但在尝试将它与 spark
[= 一起使用时却不行43=]
# test
import numpy as np
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({'one': [20, np.nan, 2.5],'two': ['january', 'february', 'march'],'three': [True, False, True]},index=list('abc'))
table = pa.Table.from_pandas(df)
您的情况有两种选择:
一个是确保 python env 在每台机器上都是正确的:
将
PYSPARK_PYTHON
设置为安装了第三方模块(例如pyarrow
)的 python 解释器。您可以使用type -a python
来检查您的从属节点上有多少 python。如果python解释器路径在每个节点上都一样,你可以在
spark-env.sh
中设置PYSPARK_PYTHON
然后复制到每个其他节点。阅读更多:https://spark.apache.org/docs/2.4.0/spark-standalone.html
另一种选择是在 spark-submit
:
您必须先将额外的模块打包到
zip
或egg
文件中。然后输入
spark-submit --py-files pyarrow.zip your_code.py
。这样,spark 将自动将您的模块传输到其他每个节点。 https://spark.apache.org/docs/latest/submitting-applications.html
希望这些对您有所帮助。
在 EMR 中 python3 默认不解析。你必须把它说清楚。一种方法是在创建集群时传递 config.json
文件。它位于 AWS EMR UI 的 Edit software settings
部分。示例 json 文件看起来像这样。
[
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
},
{
"Classification": "yarn-env",
"Properties": {},
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
}
]
此外,您还需要在所有核心节点中安装 pyarrow
模块,而不仅仅是在主节点中。为此,您可以在 AWS 中创建集群时使用 bootstrap 脚本。同样,示例 bootstrap 脚本可以像这样简单:
#!/bin/bash
sudo python3 -m pip install pyarrow==0.13.0