Kafka Consumer(使用reactor kafka)在调用unsubscribe后重新加入组并立即分配主题分区
Kafka Consumer (using reactor kafka) rejoins the group and is assigned topic partitions immediately after invoking unsubscribe
我在取消订阅 Kafka 消费者时遇到问题,请注意我们使用的是 reactor kafka API。它在开始时确实成功取消订阅,但随后它立即加入该组并被分配到该主题的分区,因此基本上它一直保持订阅状态并继续使用来自主题的消息,即使它不应该这样做!
以下是我执行此业务的代码,
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
private final ReceiverOptions<String, byte[]> receiverOptions;
private Disposable disposable;
public void subscribe() {
ReceiverOptions<String, byte[]> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> {/*some listener code here*/})
.addRevokeListener(partitions -> {/*some listener code here*/});
Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
disposable = kafkaFlux
.publishOn(Schedulers.fromExecutor(executorService))
.subscribe(record -> {/*consume the record here*/});
}
public void unsubscribe() {
if (disposable != null)
disposable.dispose();
}
当我调用unsubscribe()方法时,如下日志所示;它撤销分区并向协调器发送离开组请求。
ConsumerCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Revoke previously assigned partitions topic-2
AbstractCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Member consumer-cgid-2-f61f347c-b07c-4037-92f5-9a418ac8d153 sending LeaveGroup request to coordinator kafaka_server:9084 (id: 2147483644 rack: null) due to the consumer is being closed
但是,在发生并记录了以下一组事件之后,立即为该消费者分配了主题分区并开始从那里消费消息。请注意,我这里没有调用订阅方法!
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Attempt to heartbeat failed since group is rebalancing
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Revoke previously assigned partitions topic-0, topic-1
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] (Re-)joining group
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully joined group with generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully synced group in generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Notifying assignor about the new Assignment(partitions=[topic-0, topic-1, topic-2])
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Adding newly assigned partitions: topic-0, topic-2, topic-1
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Found no committed offset for partition topic-0
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-2 to the committed offset FetchPosition{offset=2049, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9084 (id: 3 rack: null)], epoch=14}}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-1 to the committed offset FetchPosition{offset=119509, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9083 (id: 2 rack: null)], epoch=19}}
SubscriptionState |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Resetting offset for partition topic-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9082 (id: 1 rack: null)], epoch=22}}.
使用的版本是
org.apache.kafka:kafka-clients:2.8.0
org.apache.kafka:kafka-streams:2.8.0
io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE
这里值得注意的是,还有另一个服务正在为该服务正在使用的主题生成消息。
这可能与 kafka 参数有关,也可能与 kafka 参数无关,但是如果您遇到过这样的问题并且之前已经解决,请告诉我解决方案。
谢谢,
我正在回答我自己的问题。
我编写了一个独立程序来订阅和取消订阅相关主题,该程序按预期运行。它清楚地表明从 Kafka 参数的角度来看根本没有问题(所以应用程序本身有问题)。
在做了一些很好的代码分析并逐行检查代码后,我注意到 subscribe
方法被调用了 2 次。我 commented
其中一个调用然后进行了测试,它表现得很好并且符合预期。
万万没想到,订阅两次该主题,消费者将永远无法取消订阅!
注意 - 即使调用了 2 次 unsubscribe
,此消费者也不会取消订阅该主题。所以如果它已经订阅了两次(或者更可能 - 但我没有测试过),它将永远无法取消订阅!
从 Kafka 的角度来看,这是正常的行为吗?我不太确定,请将此项目保持打开状态以供其他人回复...
谢谢...
我在取消订阅 Kafka 消费者时遇到问题,请注意我们使用的是 reactor kafka API。它在开始时确实成功取消订阅,但随后它立即加入该组并被分配到该主题的分区,因此基本上它一直保持订阅状态并继续使用来自主题的消息,即使它不应该这样做!
以下是我执行此业务的代码,
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
private final ReceiverOptions<String, byte[]> receiverOptions;
private Disposable disposable;
public void subscribe() {
ReceiverOptions<String, byte[]> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> {/*some listener code here*/})
.addRevokeListener(partitions -> {/*some listener code here*/});
Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
disposable = kafkaFlux
.publishOn(Schedulers.fromExecutor(executorService))
.subscribe(record -> {/*consume the record here*/});
}
public void unsubscribe() {
if (disposable != null)
disposable.dispose();
}
当我调用unsubscribe()方法时,如下日志所示;它撤销分区并向协调器发送离开组请求。
ConsumerCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Revoke previously assigned partitions topic-2
AbstractCoordinator |[reactive-kafka-cgid-2]|| [Consumer clientId=consumer-cgid-2, groupId=cgid] Member consumer-cgid-2-f61f347c-b07c-4037-92f5-9a418ac8d153 sending LeaveGroup request to coordinator kafaka_server:9084 (id: 2147483644 rack: null) due to the consumer is being closed
但是,在发生并记录了以下一组事件之后,立即为该消费者分配了主题分区并开始从那里消费消息。请注意,我这里没有调用订阅方法!
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Attempt to heartbeat failed since group is rebalancing
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Revoke previously assigned partitions topic-0, topic-1
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] (Re-)joining group
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully joined group with generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
AbstractCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Successfully synced group in generation Generation{generationId=3775, memberId='consumer-cgid-1-1f5da192-2a14-4634-a0f9-79707518598b', protocol='range'}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Notifying assignor about the new Assignment(partitions=[topic-0, topic-1, topic-2])
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Adding newly assigned partitions: topic-0, topic-2, topic-1
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Found no committed offset for partition topic-0
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-2 to the committed offset FetchPosition{offset=2049, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9084 (id: 3 rack: null)], epoch=14}}
ConsumerCoordinator |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Setting offset for partition topic-1 to the committed offset FetchPosition{offset=119509, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9083 (id: 2 rack: null)], epoch=19}}
SubscriptionState |[reactive-kafka-cgid-1]|| [Consumer clientId=consumer-cgid-1, groupId=cgid] Resetting offset for partition topic-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafaka_server:9082 (id: 1 rack: null)], epoch=22}}.
使用的版本是
org.apache.kafka:kafka-clients:2.8.0
org.apache.kafka:kafka-streams:2.8.0
io.projectreactor.kafka:reactor-kafka:1.1.0.RELEASE
这里值得注意的是,还有另一个服务正在为该服务正在使用的主题生成消息。
这可能与 kafka 参数有关,也可能与 kafka 参数无关,但是如果您遇到过这样的问题并且之前已经解决,请告诉我解决方案。
谢谢,
我正在回答我自己的问题。
我编写了一个独立程序来订阅和取消订阅相关主题,该程序按预期运行。它清楚地表明从 Kafka 参数的角度来看根本没有问题(所以应用程序本身有问题)。
在做了一些很好的代码分析并逐行检查代码后,我注意到 subscribe
方法被调用了 2 次。我 commented
其中一个调用然后进行了测试,它表现得很好并且符合预期。
万万没想到,订阅两次该主题,消费者将永远无法取消订阅!
注意 - 即使调用了 2 次 unsubscribe
,此消费者也不会取消订阅该主题。所以如果它已经订阅了两次(或者更可能 - 但我没有测试过),它将永远无法取消订阅!
从 Kafka 的角度来看,这是正常的行为吗?我不太确定,请将此项目保持打开状态以供其他人回复...
谢谢...