Kafka(0.9.0.1)当所有消费者死亡时偏移量消失

Kafka (0.9.0.1) offsets disappear when all consumers die

据我了解,从 Kafka 0.9.0.1 开始,偏移量在 Kafka 的主题中进行管理。 奇怪的是,当消费者组死亡时,偏移量将从主题中删除。下次我从相同的 consumerGroupId 开始时 - 偏移量将从最早的位置重置。

这是预期的吗?即使消费者组已经完全死亡,我也希望偏移量保持不变,并在它重新打开时从它停止的偏移量继续。

consumer.config 中的设置如下:

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = classfk.el.client.mappers.kafka.KafkaDeserializer
group.id =testConsumerId
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [k1:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =testAppId1463671497716
ssl.endpoint.identification.algorithm = null
key.deserializer = classorg.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest

我在日志中看到:

20:55:00.024 DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing leader SyncGroup (SYNC_GROUP: {group_id=testConsumerId,generation_id=1,member_id=testAppId1463671497716-d1ce3669-b451-4197-a5dd-39dd38c61102,group_assignment=[{member_id=testAppId1463671497716-d1ce3669-b451-4197-a5dd-39dd38c61102,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]}) to coordinator 2147483647 20:55:00.379 DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful sync group response for group testConsumerId: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} 20:55:00.431 DEBUG o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [sampleEntity-1, sampleEntity-0] 20:55:00.432 DEBUG o.a.k.c.c.i.ConsumerCoordinator - Fetching committed offsets for partitions: [sampleEntity-1, sampleEntity-0] 20:55:00.605 DEBUG o.a.k.c.c.i.ConsumerCoordinator - No committed offset for partition sampleEntity-1 20:55:00.606 o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition sampleEntity-1 to earliest offset. 20:55:00.732 o.a.k.c.consumer.internals.Fetcher - Fetched offset 0 for partition sampleEntity-1 20:55:00.732 o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition sampleEntity-0 to the committed offset 25

在服务器日志上,当消费者启动和死亡时 - [2016-05-19

16:09:50,113] INFO [GroupCoordinator 0]: Preparing to restabilize group testConsumerId with old generation 0 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,114] INFO [GroupCoordinator 0]: Stabilized group testConsumerId generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,125] INFO [GroupCoordinator 0]: Assignment received from leader for group testConsumerId for generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:09:50,158] TRACE [Group Metadata Manager on Broker 0]: Getting offsets Vector([sampleEntity,1], [sampleEntity,0]) for group testConsumerId. (kafka.coordinator.GroupMetadataManager) [2016-05-19 16:10:38,158] TRACE [GroupCoordinator 0]: Member testAppId1463674187858-ea8c9c30-4c9d-4b52-bfef-44c299442d45 in group testConsumerId has failed (kafka.coordinator.GroupCoordinator) [2016-05-19 16:10:38,158] INFO [GroupCoordinator 0]: Preparing to restabilize group testConsumerId with old generation 1 (kafka.coordinator.GroupCoordinator) [2016-05-19 16:10:38,158] TRACE [Group Metadata Manager on Broker 0]: Marking group testConsumerId as deleted. (kafka.coordinator.GroupMetadataManager) [2016-05-19 16:10:38,159] INFO [GroupCoordinator 0]: Group testConsumerId generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)

它死掉并被移除后,我无法访问更早的偏移量。

其实offsets记录的是对的,只是partition太多了,大部分都是空的。当它们为空时,没有轮询,因此偏移量不会被提交(例如,甚至为零)然后每次你 运行 - 它们在日志中显示为 "no offsets available, so using RESET"。