如何通过增加 spark 的内存来解决 pyspark `org.apache.arrow.vector.util.OversizedAllocationException` 错误?
How to solve pyspark `org.apache.arrow.vector.util.OversizedAllocationException` error by increasing spark's memory?
我 运行 在 pyspark
找到一份工作,我曾在那里使用 grouped aggregate Pandas UDF。这导致以下(此处缩写)错误:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
我相当确定这是因为 pandas UDF 接收的其中一个组很大,如果我减少数据集并删除足够多的行,我可以毫无问题地 运行 我的 UDF .但是,我想 运行 使用我的原始数据集,即使我 运行 在具有 192.0 GiB RAM 的机器上执行此 spark 作业,我仍然会遇到相同的错误。 (而且 192.0 GiB 应该足以在内存中保存整个数据集。)
如何给 spark 足够的内存来 运行 分组聚合 Pandas 需要大量内存的 UDF?
例如,是否有一些我遗漏的 spark 配置可以为 apache arrow 提供更多内存?
更长的错误消息
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 device_attack_result.count()
2
3
4
/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
520 2
521 """
--> 522 return int(self._jdf.count())
523
524 @ignore_unicode_prefix
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...
据我了解,在应用该函数之前,组的所有数据都已加载到内存中。这可能会导致内存不足异常,尤其是在组大小倾斜的情况下。 maxRecordsPerBatch 的配置不适用于组,您需要确保分组的数据适合可用内存。
您可以尝试对数据进行加盐以确保组不偏斜。请参阅下面的文章,其中讨论了连接的加盐。同样的概念也可以应用在这里
https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
- 您是否尝试过将
--executor-memory
spark-submit 选项设置为 180g
,以便 Spark 使用所有可用内存?
- 实际上,Spark 看起来不像是 OOMing 或典型的数据倾斜问题。当您的一个数据结构达到 Apache Arrow 内部限制时,这看起来像是一种相当奇怪的情况——缓冲区的大小不能大于 Integer.MAX_VALUE 字节:https://github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42。我不确定 Arrow 是如何工作的,但对我来说,看起来你的一个数据点包含超过 4Gbs 的数据
Spark 的 PandasUDF 功能使用 Arrow 框架将 spark DataFrame 转换为 pandas DataFrame,此时 Arrow 内部缓冲区限制仅为 2GB,因此您的 pandasUDF group by condition 不应产生超过 2 GB 的未压缩数据。
df.groupby('id').apply(function)
我是说
you can run your pandas UDF method only if your group by partition
size is less than 2 GB uncompressed
这是票,供您参考
https://issues.apache.org/jira/browse/ARROW-4890
以上问题似乎在 >= 0.15 版本的 pyarrow 中得到解决,只有 Spark 3.x 使用 pyarrow 0.15 版本
Arrow 0.16 已将最大缓冲区分配大小从 MaxInteger 更改为 MaxLong(64 位)
https://issues.apache.org/jira/browse/ARROW-6112
截至 2020 年 7 月,上游 Spark 仍基于 Arrow 0.15
https://github.com/apache/spark/blob/master/python/setup.py
虽然 Netty 后备缓冲区仍然不支持这个..所以你仍然会遇到这个问题作为一个不同的异常。
所以到目前为止,由于上述限制,这仍然是不可能的。
这可能会在 Spark 端得到修复
https://issues.apache.org/jira/browse/SPARK-32294
想法是将 GroupedData 分批输入 pandas UDF 以解决此问题。
更新:Databricks 平台上的 PySpark 没有这个问题。需要 DBR7.4+
我 运行 在 pyspark
找到一份工作,我曾在那里使用 grouped aggregate Pandas UDF。这导致以下(此处缩写)错误:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
我相当确定这是因为 pandas UDF 接收的其中一个组很大,如果我减少数据集并删除足够多的行,我可以毫无问题地 运行 我的 UDF .但是,我想 运行 使用我的原始数据集,即使我 运行 在具有 192.0 GiB RAM 的机器上执行此 spark 作业,我仍然会遇到相同的错误。 (而且 192.0 GiB 应该足以在内存中保存整个数据集。)
如何给 spark 足够的内存来 运行 分组聚合 Pandas 需要大量内存的 UDF?
例如,是否有一些我遗漏的 spark 配置可以为 apache arrow 提供更多内存?
更长的错误消息
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 device_attack_result.count()
2
3
4
/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
520 2
521 """
--> 522 return int(self._jdf.count())
523
524 @ignore_unicode_prefix
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...
据我了解,在应用该函数之前,组的所有数据都已加载到内存中。这可能会导致内存不足异常,尤其是在组大小倾斜的情况下。 maxRecordsPerBatch 的配置不适用于组,您需要确保分组的数据适合可用内存。
您可以尝试对数据进行加盐以确保组不偏斜。请参阅下面的文章,其中讨论了连接的加盐。同样的概念也可以应用在这里
https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
- 您是否尝试过将
--executor-memory
spark-submit 选项设置为180g
,以便 Spark 使用所有可用内存? - 实际上,Spark 看起来不像是 OOMing 或典型的数据倾斜问题。当您的一个数据结构达到 Apache Arrow 内部限制时,这看起来像是一种相当奇怪的情况——缓冲区的大小不能大于 Integer.MAX_VALUE 字节:https://github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42。我不确定 Arrow 是如何工作的,但对我来说,看起来你的一个数据点包含超过 4Gbs 的数据
Spark 的 PandasUDF 功能使用 Arrow 框架将 spark DataFrame 转换为 pandas DataFrame,此时 Arrow 内部缓冲区限制仅为 2GB,因此您的 pandasUDF group by condition 不应产生超过 2 GB 的未压缩数据。
df.groupby('id').apply(function)
我是说
you can run your pandas UDF method only if your group by partition size is less than 2 GB uncompressed
这是票,供您参考
https://issues.apache.org/jira/browse/ARROW-4890
以上问题似乎在 >= 0.15 版本的 pyarrow 中得到解决,只有 Spark 3.x 使用 pyarrow 0.15 版本
Arrow 0.16 已将最大缓冲区分配大小从 MaxInteger 更改为 MaxLong(64 位) https://issues.apache.org/jira/browse/ARROW-6112
截至 2020 年 7 月,上游 Spark 仍基于 Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py
虽然 Netty 后备缓冲区仍然不支持这个..所以你仍然会遇到这个问题作为一个不同的异常。
所以到目前为止,由于上述限制,这仍然是不可能的。
这可能会在 Spark 端得到修复 https://issues.apache.org/jira/browse/SPARK-32294 想法是将 GroupedData 分批输入 pandas UDF 以解决此问题。
更新:Databricks 平台上的 PySpark 没有这个问题。需要 DBR7.4+