通过 Spark 创建 parquet Petastorm 数据集失败并出现溢出错误(大于 4GB)
Creating parquet Petastorm dataset through Spark fails with Overflow error (larger than 4GB)
我正在尝试实现 Uber 的 Petastorm 数据集创建,它利用 Spark 按照 Github page.
上的教程创建镶木地板文件
代码:
spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
schema=MySchema, row_group_size_mb=256):
logging.info('Building RDD...')
rows_rdd = sc.parallelize(ids)\
.map(row_generator)\ # Generator that yields lists of examples
.flatMap(lambda x: dict_to_spark_row(MySchema, x))
logging.info('Creating DataFrame...')
spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet('file:///opt/data/hello_world_dataset')
现在 RDD 代码成功执行,但只有 .createDataFrame
调用失败并出现以下错误:
_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB
这是我第一次使用 Spark,所以我无法确定此错误是源自 Spark 还是 Petastorm。
查看此错误的其他解决方案(关于 Spark,而不是 Petastorm)我发现它可能与酸洗协议有关,但我无法确认,我也没有找到改变的方法酸洗协议。
我怎样才能避免这个错误?
问题出在不同进程之间传递数据的酸洗,默认酸洗协议是2,我们需要使用4才能传递大于4GB的对象。
要更改酸洗协议,在创建 Spark 会话之前,请使用以下代码
from pyspark import broadcast
import pickle
def broadcast_dump(self, value, f):
pickle.dump(value, f, 4) # was 2, 4 is first protocol supporting >4GB
f.close()
return f.name
broadcast.Broadcast.dump = broadcast_dump
构建 bluesummers 答案
The master branch of spark right now fixes这个问题,所以我用这段代码以同样的方式修补转储功能,但更安全一点。 [使用 2.3.2 测试]
from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle
def broadcast_dump(self, value, f):
try:
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
except pickle.PickleError:
raise
except Exception as e:
msg = "Could not serialize broadcast: %s: %s" \
% (e.__class__.__name__, _exception_message(e))
print_exec(sys.stderr)
raise pickle.PicklingError(msg)
f.close()
broadcast.Broadcast.dump = broadcast_dump
我正在尝试实现 Uber 的 Petastorm 数据集创建,它利用 Spark 按照 Github page.
上的教程创建镶木地板文件代码:
spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
schema=MySchema, row_group_size_mb=256):
logging.info('Building RDD...')
rows_rdd = sc.parallelize(ids)\
.map(row_generator)\ # Generator that yields lists of examples
.flatMap(lambda x: dict_to_spark_row(MySchema, x))
logging.info('Creating DataFrame...')
spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet('file:///opt/data/hello_world_dataset')
现在 RDD 代码成功执行,但只有 .createDataFrame
调用失败并出现以下错误:
_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB
这是我第一次使用 Spark,所以我无法确定此错误是源自 Spark 还是 Petastorm。
查看此错误的其他解决方案(关于 Spark,而不是 Petastorm)我发现它可能与酸洗协议有关,但我无法确认,我也没有找到改变的方法酸洗协议。
我怎样才能避免这个错误?
问题出在不同进程之间传递数据的酸洗,默认酸洗协议是2,我们需要使用4才能传递大于4GB的对象。
要更改酸洗协议,在创建 Spark 会话之前,请使用以下代码
from pyspark import broadcast
import pickle
def broadcast_dump(self, value, f):
pickle.dump(value, f, 4) # was 2, 4 is first protocol supporting >4GB
f.close()
return f.name
broadcast.Broadcast.dump = broadcast_dump
构建 bluesummers 答案
The master branch of spark right now fixes这个问题,所以我用这段代码以同样的方式修补转储功能,但更安全一点。 [使用 2.3.2 测试]
from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle
def broadcast_dump(self, value, f):
try:
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
except pickle.PickleError:
raise
except Exception as e:
msg = "Could not serialize broadcast: %s: %s" \
% (e.__class__.__name__, _exception_message(e))
print_exec(sys.stderr)
raise pickle.PicklingError(msg)
f.close()
broadcast.Broadcast.dump = broadcast_dump