从 Kafka 主题读取流时,Spark Structured Streaming 是否存在超时问题?
Does Spark Structured Streaming have some timeout issue when reading streams from a Kafka topic?
我实现了一个 spark 作业,以在结构化流中使用 foreachbatch 从 kafka 主题读取流。
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
.option("subscribe", "test-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
.option("kafka.ssl.truststore.password", tspass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("groupIdPrefix","MY_GROUP_ID")
.load()
val streamservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
var stream_df = streamservice
.selectExpr("cast(id as string) id", "cast(x as int) x")
val monitoring_stream = stream_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) { }
}
.start()
.awaitTermination()
我有以下问题。
如果kafka topic长时间没有数据,stream_df.writeStream会自动终止吗?这个有超时控制吗?
如果从kafka broker中删除kafka主题,是否会stream_df.writeStream终止?
希望在以上两种情况下,spark job一直在监控kafka主题,不终止。我需要对 kafka 连接器进行一些特殊设置吗 and/or stream_df.writerstream?
- If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?
查询的终止与正在处理的数据无关。即使没有向您的 Kafka 主题生成新消息,查询也会保持 运行,因为它是 运行 作为流。
我想这就是您在测试时已经弄明白的。我们正在使用结构化流查询来处理来自 Kafka 的数据,并且它们长时间闲置没有问题(例如在 week-end 非工作时间)。
- If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?
默认情况下,如果您在查询 运行 时删除 Kafka 主题,则会抛出异常:
ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
我提到“默认”是因为查询选项 failOnDataLoss
默认为 true
。如异常消息中所述,您可以将其设置为 false 以让您的流式查询 运行。此选项在 Structured streaming + Kafka Integration Guide 中描述为:
"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected."
我实现了一个 spark 作业,以在结构化流中使用 foreachbatch 从 kafka 主题读取流。
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
.option("subscribe", "test-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
.option("kafka.ssl.truststore.password", tspass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("groupIdPrefix","MY_GROUP_ID")
.load()
val streamservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
var stream_df = streamservice
.selectExpr("cast(id as string) id", "cast(x as int) x")
val monitoring_stream = stream_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) { }
}
.start()
.awaitTermination()
我有以下问题。
如果kafka topic长时间没有数据,stream_df.writeStream会自动终止吗?这个有超时控制吗?
如果从kafka broker中删除kafka主题,是否会stream_df.writeStream终止?
希望在以上两种情况下,spark job一直在监控kafka主题,不终止。我需要对 kafka 连接器进行一些特殊设置吗 and/or stream_df.writerstream?
- If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?
查询的终止与正在处理的数据无关。即使没有向您的 Kafka 主题生成新消息,查询也会保持 运行,因为它是 运行 作为流。
我想这就是您在测试时已经弄明白的。我们正在使用结构化流查询来处理来自 Kafka 的数据,并且它们长时间闲置没有问题(例如在 week-end 非工作时间)。
- If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?
默认情况下,如果您在查询 运行 时删除 Kafka 主题,则会抛出异常:
ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
我提到“默认”是因为查询选项 failOnDataLoss
默认为 true
。如异常消息中所述,您可以将其设置为 false 以让您的流式查询 运行。此选项在 Structured streaming + Kafka Integration Guide 中描述为:
"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected."