无法在 Delta Lake 中分配内存
Cannot Allocate Memory in Delta Lake
问题
目标是让 Spark Streaming 应用程序从 Kafka 读取数据并使用 Delta Lake 创建存储数据。 delta table 的粒度非常细,第一个分区是 organization_id(有超过 5000 个组织),第二个分区是 日期.
应用程序具有预期的延迟,但不会持续 超过一天。错误总是与内存有关,如下所示。
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006f8000000, 671088640, 0) failed; error='Cannot allocate memory' (errno=12)
没有持久化,整个应用程序的内存已经很高。
我试过的
增加内存和工作是我尝试的第一件事,但分区数量也发生了变化,从 4 到 16。
执行脚本
spark-submit \
--verbose \
--master yarn \
--deploy-mode cluster \
--driver-memory 2G \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 4 \
--files s3://my-bucket/log4j-driver.properties,s3://my-bucket/log4j-executor.properties \
--jars /home/hadoop/delta-core_2.12-0.8.0.jar,/usr/lib/spark/external/lib/spark-sql-kafka-0-10.jar \
--class my.package.app \
--conf spark.driver.memoryOverhead=512 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.rdd.compress=true \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.yarn.maxAppAttempts=100 \
--conf spark.task.maxFailures=100 \
--conf spark.executor.heartbeatInterval=20s \
--conf spark.network.timeout=300s \
--conf spark.driver.maxResultSize=0 \
--conf spark.driver.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-driver.hprof -Dlog4j.configuration=log4j-driver.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
--conf spark.executor.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-executor.hprof -Dlog4j.configuration=log4j-executor.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore \
--conf spark.databricks.delta.retentionDurationCheck.enabled=false \
--conf spark.databricks.delta.vacuum.parallelDelete.enabled=true \
--conf spark.sql.shuffle.partitions=16 \
--name "UsageFactProcessor" \
application.jar
代码
val source = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", value = false)
.option("fetchOffset.numRetries", 10)
.option("fetchOffset.retryIntervalMs", 1000)
.option("maxOffsetsPerTrigger", 50000L)
.option("kafkaConsumer.pollTimeoutMs", 300000L)
.load()
val transformed = source
.transform(applySchema)
val query = transformed
.coalesce(16)
.writeStream
.trigger(Trigger.ProcessingTime("1 minute"))
.outputMode(OutputMode.Append)
.format("delta")
.partitionBy("organization_id", "date")
.option("path", table)
.option("checkpointLocation", checkpoint)
.option("mergeSchema", "true")
.start()
spark.catalog.clearCache()
query.awaitTermination()
版本
火花:3.0.1
增量:0.8.0
问题
您认为可能导致此问题的原因是什么?
刚刚将版本升级到 Delta.io 1.0.0,它不再发生了。
问题
目标是让 Spark Streaming 应用程序从 Kafka 读取数据并使用 Delta Lake 创建存储数据。 delta table 的粒度非常细,第一个分区是 organization_id(有超过 5000 个组织),第二个分区是 日期.
应用程序具有预期的延迟,但不会持续 超过一天。错误总是与内存有关,如下所示。
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006f8000000, 671088640, 0) failed; error='Cannot allocate memory' (errno=12)
没有持久化,整个应用程序的内存已经很高。
我试过的
增加内存和工作是我尝试的第一件事,但分区数量也发生了变化,从 4 到 16。
执行脚本
spark-submit \
--verbose \
--master yarn \
--deploy-mode cluster \
--driver-memory 2G \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 4 \
--files s3://my-bucket/log4j-driver.properties,s3://my-bucket/log4j-executor.properties \
--jars /home/hadoop/delta-core_2.12-0.8.0.jar,/usr/lib/spark/external/lib/spark-sql-kafka-0-10.jar \
--class my.package.app \
--conf spark.driver.memoryOverhead=512 \
--conf spark.executor.memoryOverhead=1024 \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.rdd.compress=true \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.yarn.maxAppAttempts=100 \
--conf spark.task.maxFailures=100 \
--conf spark.executor.heartbeatInterval=20s \
--conf spark.network.timeout=300s \
--conf spark.driver.maxResultSize=0 \
--conf spark.driver.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-driver.hprof -Dlog4j.configuration=log4j-driver.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
--conf spark.executor.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-executor.hprof -Dlog4j.configuration=log4j-executor.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore \
--conf spark.databricks.delta.retentionDurationCheck.enabled=false \
--conf spark.databricks.delta.vacuum.parallelDelete.enabled=true \
--conf spark.sql.shuffle.partitions=16 \
--name "UsageFactProcessor" \
application.jar
代码
val source = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", value = false)
.option("fetchOffset.numRetries", 10)
.option("fetchOffset.retryIntervalMs", 1000)
.option("maxOffsetsPerTrigger", 50000L)
.option("kafkaConsumer.pollTimeoutMs", 300000L)
.load()
val transformed = source
.transform(applySchema)
val query = transformed
.coalesce(16)
.writeStream
.trigger(Trigger.ProcessingTime("1 minute"))
.outputMode(OutputMode.Append)
.format("delta")
.partitionBy("organization_id", "date")
.option("path", table)
.option("checkpointLocation", checkpoint)
.option("mergeSchema", "true")
.start()
spark.catalog.clearCache()
query.awaitTermination()
版本
火花:3.0.1
增量:0.8.0
问题
您认为可能导致此问题的原因是什么?
刚刚将版本升级到 Delta.io 1.0.0,它不再发生了。