为什么 Kafka Consumer 不断收到相同的消息(offset)
Why does Kafka Consumer keep receiving the same messages (offset)
我有一个发送 kafka 请求消息并等待 kafka 响应消息的 SOAP Web 服务(例如 consumer.poll(10000))。
每次调用 Web 服务时,它都会创建一个新的 Kafka 生产者和一个新的 Kafka 消费者。
每次调用 Web 服务时,消费者都会收到相同的消息(例如具有相同偏移量的消息)。
我正在使用 Kafka 0.9 并且启用了自动提交并且自动提交频率为 100 毫秒。
对于我在其自己的 Callable 中处理的 poll() 方法返回的每个 ConsumerRecord,例如
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
final Handler handler = new Handler(consumerRecord);
executor.submit(handler);
}
为什么我总是一遍又一遍地收到相同的消息?
更新 0001
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
基于您显示的代码。我认为你的问题是新的 Consumer 是单线程的。如果您投票一次然后不再进行另一次投票,那么 auto.commit.offset
将无法正常工作。
尝试将您的代码放入一个 while 循环中,看看您何时再次轮询将提交偏移量。
我有一个发送 kafka 请求消息并等待 kafka 响应消息的 SOAP Web 服务(例如 consumer.poll(10000))。
每次调用 Web 服务时,它都会创建一个新的 Kafka 生产者和一个新的 Kafka 消费者。
每次调用 Web 服务时,消费者都会收到相同的消息(例如具有相同偏移量的消息)。
我正在使用 Kafka 0.9 并且启用了自动提交并且自动提交频率为 100 毫秒。
对于我在其自己的 Callable 中处理的 poll() 方法返回的每个 ConsumerRecord,例如
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
final Handler handler = new Handler(consumerRecord);
executor.submit(handler);
}
为什么我总是一遍又一遍地收到相同的消息?
更新 0001
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
基于您显示的代码。我认为你的问题是新的 Consumer 是单线程的。如果您投票一次然后不再进行另一次投票,那么 auto.commit.offset
将无法正常工作。
尝试将您的代码放入一个 while 循环中,看看您何时再次轮询将提交偏移量。