将大型 DataFrame 从 PySpark 写入 Kafka 会超时
Writing large DataFrame from PySpark to Kafka runs into timeout
我正在尝试将一个包含大约 2.3 亿条记录的数据框写入 Kafka。更具体地说 Kafka-enable Azure Event Hub,但我不确定这是否真的是我的问题的根源。
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
这启动良好并成功(并且非常快)将大约 3-4 百万条记录写入队列。但是几分钟后作业停止,并显示如下消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 4 times, most recent failure: Lost task 6.3 in stage 7.0 (TID 248, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for mytopic-18: 32839 ms has passed since last append
或
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 8.0 failed 4 times, most recent failure: Lost task 13.3 in stage 8.0 (TID 348, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: The request timed out.
此外,我从未看到 created/written 检查点文件。
我也玩过 .option("kafka.delivery.timeout.ms", 30000)
和不同的值,但似乎没有任何效果。
我在 Azure Databricks 集群版本 5.0(包括 Apache Spark 2.4.0、Scala 2.11)中 运行
我没有在我的事件中心看到任何错误,例如节流,所以应该没问题。
终于弄明白了(大部分):
事实证明,大约 16000 条消息的默认批量大小对于端点来说太大了。在我将 batch.size 参数设置为 5000 后,它开始工作并以每分钟大约 70 万条消息的速度写入事件中心。另外,上面的超时参数是错误的,只是被忽略了。这是kafka.request.timeout.ms
唯一的问题是它仍然随机地超时运行并且显然又从头开始,所以我以重复结束。将为此打开 another question。
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
我正在尝试将一个包含大约 2.3 亿条记录的数据框写入 Kafka。更具体地说 Kafka-enable Azure Event Hub,但我不确定这是否真的是我的问题的根源。
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
这启动良好并成功(并且非常快)将大约 3-4 百万条记录写入队列。但是几分钟后作业停止,并显示如下消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 4 times, most recent failure: Lost task 6.3 in stage 7.0 (TID 248, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for mytopic-18: 32839 ms has passed since last append
或
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 8.0 failed 4 times, most recent failure: Lost task 13.3 in stage 8.0 (TID 348, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: The request timed out.
此外,我从未看到 created/written 检查点文件。
我也玩过 .option("kafka.delivery.timeout.ms", 30000)
和不同的值,但似乎没有任何效果。
我在 Azure Databricks 集群版本 5.0(包括 Apache Spark 2.4.0、Scala 2.11)中 运行
我没有在我的事件中心看到任何错误,例如节流,所以应该没问题。
终于弄明白了(大部分):
事实证明,大约 16000 条消息的默认批量大小对于端点来说太大了。在我将 batch.size 参数设置为 5000 后,它开始工作并以每分钟大约 70 万条消息的速度写入事件中心。另外,上面的超时参数是错误的,只是被忽略了。这是kafka.request.timeout.ms
唯一的问题是它仍然随机地超时运行并且显然又从头开始,所以我以重复结束。将为此打开 another question。
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()