在行组大小小于 100 的 spark 中创建镶木地板文件
Creating parquet files in spark with row-group size that is less than 100
我有一个包含少量字段的 spark 数据框。一些字段是巨大的二进制 blob。整行的大小约为 50 MB。
我正在将数据框保存为 parquet 格式。我正在使用 parquet.block.size
参数控制行组的大小。
Spark 将生成一个 parquet 文件,但我总是会在一个行组中得到至少 100 行。这对我来说是个问题,因为块大小可能会变成千兆字节,这对我的应用程序来说效果不佳。
parquet.block.size
只要大小足以容纳超过 100 行,就可以正常工作。
我将 InternalParquetRecordWriter.java 修改为 MINIMUM_RECORD_COUNT_FOR_CHECK = 2
,这解决了问题,但是,我找不到任何配置值可以支持调整这个硬编码常量。
是否有 different/better 方法来获取小于 100 的行组大小?
这是我的代码片段:
from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import StructType, StructField, BinaryType
def fake_row(x):
result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
return Row(result, result)
spark_session = SparkSession \
.builder \
.appName("bbox2d_dataset_extraction") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "4g")
spark_session.master('local[5]')
spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
write.option("compression", "none"). \
mode('overwrite'). \
parquet('/tmp/huge/')
很遗憾,我还没有找到这样做的方法。我报告 this issue 删除硬编码值并使它们可配置。如果你有兴趣,我有一个补丁。
虽然 PARQUET-409 尚未修复,但有几个解决方法可以使应用程序使用 100
每行组的硬编码最小记录数。
第一个问题和解决方法:
您提到您的行可能大至 50Mb。
这给出了大约 5Gb 的行组大小。
同时,您的 spark 执行程序只有 4Gb (spark.executor.memory
)。
使其明显大于最大行组大小。
对于 spark.executor.memory
,我建议使用 12-20Gb 的大型 spark 执行器内存。尝试一下,看看哪个适用于您的数据集。
我们的大多数生产作业 运行 都具有此范围内的 spark 执行程序内存。
为了使其适用于如此大的行组,您可能还需要将 spark.executor.cores
调低至 1 以确保每个执行程序进程一次只占用一个如此大的行组。 (以损失一些 Spark 效率为代价)也许尝试将 spark.executor.cores
设置为 2 - 这可能需要将 spark.executor.memory
增加到 20-31Gb 范围。 (尝试保持 under 32Gb,因为 jvm 切换到非压缩 OOP,这可能有高达 50% 的内存开销)
第二个问题和解决方法:如此大的 5Gb 行块很可能分布在许多 HDFS 块中,因为默认 HDFS 块在 128-256Mb 范围内。 (我假设你使用 HDFS 来存储那些镶木地板文件,因为你有“hadoop”标签)Parquet best practice 是一个行组完全驻留在一个 HDFS 块中:
Row group size: Larger row groups allow for larger column chunks which
makes it possible to do larger sequential IO. Larger groups also
require more buffering in the write path (or a two pass write). We
recommend large row groups (512MB - 1GB). Since an entire row group
might need to be read, we want it to completely fit on one HDFS block.
Therefore, HDFS block sizes should also be set to be larger. An
optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1
HDFS block per HDFS file.
下面是如何更改 HDFS 块大小的示例(在您 创建 此类镶木地板文件之前设置):
sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")
或在 Spark Scala 中:
sc.hadoopConfiguration.set("dfs.block.size", "5g")
我希望有时这会在 Parquet 级别得到修复,但是这两个解决方法应该允许您使用 Parquet 操作如此大的行组。
我有一个包含少量字段的 spark 数据框。一些字段是巨大的二进制 blob。整行的大小约为 50 MB。
我正在将数据框保存为 parquet 格式。我正在使用 parquet.block.size
参数控制行组的大小。
Spark 将生成一个 parquet 文件,但我总是会在一个行组中得到至少 100 行。这对我来说是个问题,因为块大小可能会变成千兆字节,这对我的应用程序来说效果不佳。
parquet.block.size
只要大小足以容纳超过 100 行,就可以正常工作。
我将 InternalParquetRecordWriter.java 修改为 MINIMUM_RECORD_COUNT_FOR_CHECK = 2
,这解决了问题,但是,我找不到任何配置值可以支持调整这个硬编码常量。
是否有 different/better 方法来获取小于 100 的行组大小?
这是我的代码片段:
from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import StructType, StructField, BinaryType
def fake_row(x):
result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
return Row(result, result)
spark_session = SparkSession \
.builder \
.appName("bbox2d_dataset_extraction") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "4g")
spark_session.master('local[5]')
spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
write.option("compression", "none"). \
mode('overwrite'). \
parquet('/tmp/huge/')
很遗憾,我还没有找到这样做的方法。我报告 this issue 删除硬编码值并使它们可配置。如果你有兴趣,我有一个补丁。
虽然 PARQUET-409 尚未修复,但有几个解决方法可以使应用程序使用 100
每行组的硬编码最小记录数。
第一个问题和解决方法:
您提到您的行可能大至 50Mb。
这给出了大约 5Gb 的行组大小。
同时,您的 spark 执行程序只有 4Gb (spark.executor.memory
)。
使其明显大于最大行组大小。
对于 spark.executor.memory
,我建议使用 12-20Gb 的大型 spark 执行器内存。尝试一下,看看哪个适用于您的数据集。
我们的大多数生产作业 运行 都具有此范围内的 spark 执行程序内存。
为了使其适用于如此大的行组,您可能还需要将 spark.executor.cores
调低至 1 以确保每个执行程序进程一次只占用一个如此大的行组。 (以损失一些 Spark 效率为代价)也许尝试将 spark.executor.cores
设置为 2 - 这可能需要将 spark.executor.memory
增加到 20-31Gb 范围。 (尝试保持 under 32Gb,因为 jvm 切换到非压缩 OOP,这可能有高达 50% 的内存开销)
第二个问题和解决方法:如此大的 5Gb 行块很可能分布在许多 HDFS 块中,因为默认 HDFS 块在 128-256Mb 范围内。 (我假设你使用 HDFS 来存储那些镶木地板文件,因为你有“hadoop”标签)Parquet best practice 是一个行组完全驻留在一个 HDFS 块中:
Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
下面是如何更改 HDFS 块大小的示例(在您 创建 此类镶木地板文件之前设置):
sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")
或在 Spark Scala 中:
sc.hadoopConfiguration.set("dfs.block.size", "5g")
我希望有时这会在 Parquet 级别得到修复,但是这两个解决方法应该允许您使用 Parquet 操作如此大的行组。