为什么新的消费者组id不从头开始
Why doesn't a new consumer group id start from the beginning
我有一个 kafka 0.10 集群,其中有几个主题,这些主题会产生消息。
当我使用 KafkaConsumer 和新的组 ID 订阅主题时,我没有返回任何记录,但是如果我使用 ConsumerRebalanceListener 订阅主题,该 ConsumerRebalanceListener 以相同的组 ID 开头,那么我得到主题中的记录。
@Grab('org.apache.kafka:kafka-clients:0.10.0.0')
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.PartitionInfo
Properties props = new Properties()
props.with {
put("bootstrap.servers","***********:9091")
put("group.id","script-test-noseek")
put("enable.auto.commit","true")
put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
put("session.timeout.ms",30000)
}
KafkaConsumer consumer = new KafkaConsumer(props)
def topicMap = [:]
consumer.listTopics().each { topic, partitioninfo ->
topicMap[topic] = 0
}
topicMap.each {topic, count ->
def stopTime = new Date().time + 30_000
def stop = false
println "Starting topic: $topic"
consumer.subscribe([topic])
//consumer.subscribe([topic], new CRListener(consumer:consumer))
while(!stop) {
ConsumerRecords<String, String> records = consumer.poll(5_000)
topicMap[topic] += records.size()
consumer.commitAsync()
if ( new Date().time > stopTime || records.size() == 0) {
stop = true
}
}
consumer.unsubscribe()
}
def total = 0
println "------------------- Results -----------------------"
topicMap.each { k,v ->
if ( v > 0 ) {
println "Topic: ${k.padRight(64,' ')} Records: ${v}"
}
total += v
}
println "==================================================="
println "Total: ${total}"
def dummy = "Process End"
class CRListener implements ConsumerRebalanceListener {
KafkaConsumer consumer
void onPartitionsAssigned(java.util.Collection partitions) {
consumer.seekToBeginning(partitions)
}
void onPartitionsRevoked(java.util.Collection partitions) {
consumer.commitSync()
}
}
密码是Groovy2.4.x。我屏蔽了 bootstrap 服务器。
如果我取消对监听器的消费者订阅行的注释,它会按照我的预期进行。但实际上我没有得到任何结果。
假设我更改了每个 运行 的组 ID,只是为了不在另一个执行中断的地方继续执行。
我看不出我做错了什么。任何帮助将不胜感激。
如果您使用新的消费者组 ID 并希望从头阅读整个主题,则需要在您的属性中指定参数 "auto.offset.reset=earliest"。 (默认值为 "latest")
Properties props = new Properties()
props.with {
// all other values...
put("auto.offset.reset","earliest")
}
消费者启动时会发生以下情况:
- 寻找(有效的)提交的偏移量以供使用
group.id
- 如果找到(有效的)偏移量,从那里继续
- 如果没有找到(有效的)offset,则根据
auto.offset.reset
设置offset
我有一个 kafka 0.10 集群,其中有几个主题,这些主题会产生消息。
当我使用 KafkaConsumer 和新的组 ID 订阅主题时,我没有返回任何记录,但是如果我使用 ConsumerRebalanceListener 订阅主题,该 ConsumerRebalanceListener 以相同的组 ID 开头,那么我得到主题中的记录。
@Grab('org.apache.kafka:kafka-clients:0.10.0.0')
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.PartitionInfo
Properties props = new Properties()
props.with {
put("bootstrap.servers","***********:9091")
put("group.id","script-test-noseek")
put("enable.auto.commit","true")
put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
put("session.timeout.ms",30000)
}
KafkaConsumer consumer = new KafkaConsumer(props)
def topicMap = [:]
consumer.listTopics().each { topic, partitioninfo ->
topicMap[topic] = 0
}
topicMap.each {topic, count ->
def stopTime = new Date().time + 30_000
def stop = false
println "Starting topic: $topic"
consumer.subscribe([topic])
//consumer.subscribe([topic], new CRListener(consumer:consumer))
while(!stop) {
ConsumerRecords<String, String> records = consumer.poll(5_000)
topicMap[topic] += records.size()
consumer.commitAsync()
if ( new Date().time > stopTime || records.size() == 0) {
stop = true
}
}
consumer.unsubscribe()
}
def total = 0
println "------------------- Results -----------------------"
topicMap.each { k,v ->
if ( v > 0 ) {
println "Topic: ${k.padRight(64,' ')} Records: ${v}"
}
total += v
}
println "==================================================="
println "Total: ${total}"
def dummy = "Process End"
class CRListener implements ConsumerRebalanceListener {
KafkaConsumer consumer
void onPartitionsAssigned(java.util.Collection partitions) {
consumer.seekToBeginning(partitions)
}
void onPartitionsRevoked(java.util.Collection partitions) {
consumer.commitSync()
}
}
密码是Groovy2.4.x。我屏蔽了 bootstrap 服务器。 如果我取消对监听器的消费者订阅行的注释,它会按照我的预期进行。但实际上我没有得到任何结果。
假设我更改了每个 运行 的组 ID,只是为了不在另一个执行中断的地方继续执行。
我看不出我做错了什么。任何帮助将不胜感激。
如果您使用新的消费者组 ID 并希望从头阅读整个主题,则需要在您的属性中指定参数 "auto.offset.reset=earliest"。 (默认值为 "latest")
Properties props = new Properties()
props.with {
// all other values...
put("auto.offset.reset","earliest")
}
消费者启动时会发生以下情况:
- 寻找(有效的)提交的偏移量以供使用
group.id
- 如果找到(有效的)偏移量,从那里继续
- 如果没有找到(有效的)offset,则根据
auto.offset.reset
设置offset