无法在 Delta Lake 中分配内存

Cannot Allocate Memory in Delta Lake

问题

目标是让 Spark Streaming 应用程序从 Kafka 读取数据并使用 Delta Lake 创建存储数据。 delta table 的粒度非常细,第一个分区是 organization_id(有超过 5000 个组织),第二个分区是 日期.

应用程序具有预期的延迟,但不会持续 超过一天。错误总是与内存有关,如下所示。

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000006f8000000, 671088640, 0) failed; error='Cannot allocate memory' (errno=12)

没有持久化,整个应用程序的内存已经很高。

我试过的

增加内存和工作是我尝试的第一件事,但分区数量也发生了变化,从 4 到 16。

执行脚本

spark-submit \
  --verbose \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2G \
  --executor-memory 4G \
  --executor-cores 2 \
  --num-executors 4 \
  --files s3://my-bucket/log4j-driver.properties,s3://my-bucket/log4j-executor.properties \
  --jars /home/hadoop/delta-core_2.12-0.8.0.jar,/usr/lib/spark/external/lib/spark-sql-kafka-0-10.jar \
  --class my.package.app \
  --conf spark.driver.memoryOverhead=512 \
  --conf spark.executor.memoryOverhead=1024 \
  --conf spark.memory.fraction=0.8 \
  --conf spark.memory.storageFraction=0.3 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.rdd.compress=true \
  --conf spark.yarn.max.executor.failures=100 \
  --conf spark.yarn.maxAppAttempts=100 \
  --conf spark.task.maxFailures=100 \
  --conf spark.executor.heartbeatInterval=20s \
  --conf spark.network.timeout=300s \
  --conf spark.driver.maxResultSize=0 \
  --conf spark.driver.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-driver.hprof -Dlog4j.configuration=log4j-driver.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
  --conf spark.executor.extraJavaOptions="-XX:-PrintGCDetails -XX:-PrintGCDateStamps -XX:-UseParallelGC -XX:+UseG1GC -XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump-executor.hprof -Dlog4j.configuration=log4j-executor.properties -Dvm.logging.level=ERROR -Dvm.logging.name=UsageFact -Duser.timezone=UTC" \
  --conf spark.sql.session.timeZone=UTC \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
  --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore \
  --conf spark.databricks.delta.retentionDurationCheck.enabled=false \
  --conf spark.databricks.delta.vacuum.parallelDelete.enabled=true \
  --conf spark.sql.shuffle.partitions=16 \
  --name "UsageFactProcessor" \
  application.jar

代码

    val source = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", broker)
      .option("subscribe", topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", value = false)
      .option("fetchOffset.numRetries", 10)
      .option("fetchOffset.retryIntervalMs", 1000)
      .option("maxOffsetsPerTrigger", 50000L)
      .option("kafkaConsumer.pollTimeoutMs", 300000L)
      .load()

    val transformed = source
      .transform(applySchema)

    val query = transformed
      .coalesce(16)
      .writeStream
      .trigger(Trigger.ProcessingTime("1 minute"))
      .outputMode(OutputMode.Append)
      .format("delta")
      .partitionBy("organization_id", "date")
      .option("path", table)
      .option("checkpointLocation", checkpoint)
      .option("mergeSchema", "true")
      .start()

    spark.catalog.clearCache()
    query.awaitTermination()

版本

火花:3.0.1

增量:0.8.0

问题

您认为可能导致此问题的原因是什么?

刚刚将版本升级到 Delta.io 1.0.0,它不再发生了。