如何为spark structured streaming指定kafka consumer的group id?
How to specify the group id of kafka consumer for spark structured streaming?
我想要 运行 同一个 emr 集群中的 2 个 spark 结构化流作业来使用同一个 kafka 主题。两项工作都处于 运行ning 状态。但是,只有一个job可以拿到kafka的数据。我对kafka部分的配置如下。
.format("kafka")
.option("kafka.bootstrap.servers", "xxx")
.option("subscribe", "sametopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "./cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.load()
我没有设置group.id。我猜两个作业中的同一个组 ID 会导致此问题。但是,当我设置 group.id 时,它会抱怨“用户指定的消费者组未用于跟踪偏移量”。解决这个问题的正确方法是什么?谢谢!
您需要 运行 Spark v3.
来自https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
kafka.group.id
The Kafka group id to use in Kafka consumer while reading from Kafka.
Use this with caution. By default, each query generates a unique group
id for reading data. This ensures that each Kafka source has its own
consumer group that does not face interference from any other
consumer, and therefore can read all of the partitions of its
subscribed topics. In some scenarios (for example, Kafka group-based
authorization), you may want to use a specific authorized group id to
read data. You can optionally set the group id. However, do this with
extreme caution as it can cause unexpected behavior. Concurrently
running queries (both, batch and streaming) or sources with the same
group id are likely interfere with each other causing each query to
read only part of the data. This may also occur when queries are
started/restarted in quick succession. To minimize such issues, set
the Kafka consumer session timeout (by setting option
"kafka.session.timeout.ms") to be very small. When this is set, option
"groupIdPrefix" will be ignored.
我想要 运行 同一个 emr 集群中的 2 个 spark 结构化流作业来使用同一个 kafka 主题。两项工作都处于 运行ning 状态。但是,只有一个job可以拿到kafka的数据。我对kafka部分的配置如下。
.format("kafka")
.option("kafka.bootstrap.servers", "xxx")
.option("subscribe", "sametopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "./cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.load()
我没有设置group.id。我猜两个作业中的同一个组 ID 会导致此问题。但是,当我设置 group.id 时,它会抱怨“用户指定的消费者组未用于跟踪偏移量”。解决这个问题的正确方法是什么?谢谢!
您需要 运行 Spark v3.
来自https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
kafka.group.id
The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.