pyspark saveAsTextFile 适用于 python 2.7 但不适用于 3.4

pyspark saveAsTextFile works for python 2.7 but not 3.4

我在 Amazon EMR 集群上 运行ning pyspark。我有一个非常简单的测试脚本,看看我是否可以使用 spark-submit 将数据写入 s3 ...

from pyspark import SparkContext
sc = SparkContext()
numbers = sc.parallelize(range(100))
numbers.saveAsTextFile("s3n://my-bucket/test.txt")
sc.stop()

当我 运行 这个脚本在 python2.7 环境中使用 spark-submit 时,它工作得很好。但是当我尝试在 python3.4 环境中 运行 相同的脚本时,我得到以下回溯 ...

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File ".../pyspark/worker.py", line 161, in main 
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File ".../pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
File ".../pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
File ".../pyspark/serializers.py", line 419, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'unicode' on <module 'builtins' (built-in)>

我正在使用 conda 并通过设置 PYSPARK_PYTHONPYSPARK_DRIVER_PYTHON 变量来操纵我的 python 环境。

在 python 3 中使用 saveAsTextFile 有问题吗?还是我错过了设置 python 3 环境的步骤?

谢谢!

您的 EMR 集群可能配置为使用 pyspark 2.7,而您是 运行 python 3.4,这在使用 pyspark 2.7 时可能会出现问题

以下link描述了如何配置Amazon EMR以在python 3.4

中使用spark

I know Python 3.4.3 is installed on an Amazon EMR cluster instances, but the default Python version used by Spark and other programs is Python 2.7.10. How do I change the default Python version to Python 3 and run a pyspark job?

https://aws.amazon.com/premiumsupport/knowledge-center/emr-pyspark-python-3x/


range() 函数在 Python2 和 Python3 中有不同的实现。

在Python2range()returnsa list of numbers.
在Python2range()returnsa generator

因此,当您使用 Python3 时,您提供的输入是 generator 而不是 list of numbers

有关 python2 与 python3 中 range() 之间差异的更多信息:

Python2 https://docs.python.org/2/library/functions.html#range 范围(开始,停止[,步骤])

This is a versatile function to create lists containing arithmetic progressions. It is most often used in for loops. The arguments must be plain integers. If the step argument is omitted, it defaults to 1. If the start argument is omitted, it defaults to 0. The full form returns a list of plain integers [start, start + step, start + 2 * step, ...]. If step is positive, the last element is the largest start + i * step less than stop; if step is negative, the last element is the smallest start + i * step greater than stop. step must not be zero (or else ValueError is raised).

示例:

>>> range(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Python 3 https://docs.python.org/3/library/functions.html#func-range 范围(开始,停止[,步骤])

Rather than being a function, range is actually an immutable sequence type, as documented in Ranges and Sequence Types — list, tuple, range.

>>> range(10)
range(0, 10)

试试这个

出口PYSPARK_PYTHON=python3

好的,看来这与 python3 无关,而与我的 conda 环境有关。总之,我在我的bootstrap.sh里搭建了一个conda环境,但我只是在master节点上才真正激活了它。所以主节点使用的是 conda python,而工作节点使用的是系统 python.

我现在的解决办法是设置PYSPARK_PYTHON=/home/hadoop/miniconda3/envs/myenv/python.

是否有更好的方法在工作节点上激活我的 conda 环境?