spring 中的 Kafka Consumer 我可以通过编程方式重新分配分区吗?
Kafka Consumer in spring can I re-assign partitions programmatically?
我是 Kafka 的新手,使用 @KafkaListener
(spring) 来定义 kafka 消费者。
我想检查是否可以在运行时手动将分区分配给消费者。
例如,当应用程序启动时我不想“消耗”任何数据。为此,我目前正在使用 @KafkaListener(autoStartup=false ... )
。
在某些时候,我应该收到一个通知(来自应用程序的另一部分),其中包含要处理的 partitionId,所以我想“跳到”该分区的最新可用偏移量,因为我不需要使用碰巧已经存在的数据并将 KafkaConsumer 与该通知中的 partitionId“关联”。
稍后我可能会收到“停止收听此分区”的通知,尽管事实上存在于其他地方的生产者一直在写入该主题和该分区,因此我应该“取消链接”消费者分区并停止接收消息。
我看到有一个 org.springframework.kafka.annotation.TopicPartition
但它提供了一种指定“静态”关联的方法,所以我正在寻找一种“动态”的方法。
我想我可以求助于低级 Kafka 客户端 API 但我真的更愿意在这里使用 spring。
更新
我将主题 cnp_multi_partition_test_topic
与 3 个分区一起使用。
我当前尝试从消费者动态管理分区的代码如下所示:
@Slf4j
public class SampleKafkaConsumer {
@KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
Object sessionId = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId, data);
}
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {
private final KafkaListenerEndpointRegistry registry;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final UUIDProvider uuidProvider;
private ConcurrentMessageListenerContainer<String, String> container;
public void assignPartitions(List<Integer> partitions) {
if(container != null) {
container.stop();
container = null;
}
if(partitions.isEmpty()) {
return;
}
var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
container =
factory.createContainer(newTopicPartitionOffsets);
container.getContainerProperties().setMessageListener(
registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
// random group
container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
container.setConcurrency(1);
container.start();
}
private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
return partitions.stream()
.map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
.collect(Collectors.toList())
.toArray(new TopicPartitionOffset[] {});
}
}
两者都是 Spring beans(单例),通过 java 配置管理。
生产者每秒生成 3 条消息并将其发送到测试主题的 3 个分区中。我使用了 kafka UI 工具来确保所有消息确实按预期到达 我使用 @EventListener
和 @Async
使其同时发生。
以下是我如何尝试模拟工作:
@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
@Autowired
MultiPartitionKafkaConsumerManager manager;
@Test
public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
log.info("Starting the test");
sleep(5_000);
log.info("Start listening on partition 0");
manager.assignPartitions(List.of(0));
sleep(10_000);
log.info("Start listening on partition 0,2");
manager.assignPartitions(List.of(0,2));
sleep(10_000);
log.info("Do not listen on partition 0 anymore");
manager.assignPartitions(List.of(2));
sleep(10_000);
log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
manager.assignPartitions(Collections.emptyList());
sleep(10_000);
日志显示如下:
06:34:20.164 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped
所以我确实看到消费者已启动,它甚至尝试在内部轮询记录,但我想我看到 WakeupException 被抛出并被代理“吞没”。我不确定我是否理解为什么会这样?
您无法在运行时更改手动分配。有几种方法可以达到您想要的结果。
您可以在原型bean 中声明侦听器;参见
您可以使用侦听器容器工厂创建一个具有适当主题配置的新容器,并从静态声明的容器中复制侦听器。
如果需要,我可以提供后者的示例。
...
编辑
这是第二种技术的示例...
@SpringBootApplication
public class So69465733Application {
public static void main(String[] args) {
SpringApplication.run(So69465733Application.class, args);
}
@KafkaListener(id = "dummy", topics = "dummy", autoStartup = "false")
void listen(String in) {
System.out.println(in);
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return args -> {
System.out.println("Hit Enter to create a container for topic1, partition0");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container1 =
factory.createContainer(new TopicPartitionOffset("topic1", 0, SeekPosition.END));
container1.getContainerProperties().setMessageListener(
registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
container1.getContainerProperties().setGroupId("topic1-0-group2");
container1.start();
System.out.println("Hit Enter to create a container for topic2, partition0");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container2 =
factory.createContainer(new TopicPartitionOffset("topic2", 0, SeekPosition.END));
container2.getContainerProperties().setMessageListener(
registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
container2.getContainerProperties().setGroupId("topic2-0-group2");
container2.start();
System.in.read();
container1.stop();
container2.stop();
};
}
}
编辑
从命令行生产者向topic1、topic2发送记录后记录。
Hit Enter to create a container for topic1, partition0
ConsumerConfig values:
...
Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622966736
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Subscribed to partition(s): topic1-0
Hit Enter to create a container for topic2, partition0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Seeking to LATEST offset of partition topic1-0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Resetting offset for partition topic1-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
ConsumerConfig values:
...
Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622969071
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Subscribed to partition(s): topic2-0
Hit Enter to stop containers
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Seeking to LATEST offset of partition topic2-0
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Resetting offset for partition topic2-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
record from topic1
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
record from topic2
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
Application shutdown requested.
我是 Kafka 的新手,使用 @KafkaListener
(spring) 来定义 kafka 消费者。
我想检查是否可以在运行时手动将分区分配给消费者。
例如,当应用程序启动时我不想“消耗”任何数据。为此,我目前正在使用 @KafkaListener(autoStartup=false ... )
。
在某些时候,我应该收到一个通知(来自应用程序的另一部分),其中包含要处理的 partitionId,所以我想“跳到”该分区的最新可用偏移量,因为我不需要使用碰巧已经存在的数据并将 KafkaConsumer 与该通知中的 partitionId“关联”。
稍后我可能会收到“停止收听此分区”的通知,尽管事实上存在于其他地方的生产者一直在写入该主题和该分区,因此我应该“取消链接”消费者分区并停止接收消息。
我看到有一个 org.springframework.kafka.annotation.TopicPartition
但它提供了一种指定“静态”关联的方法,所以我正在寻找一种“动态”的方法。
我想我可以求助于低级 Kafka 客户端 API 但我真的更愿意在这里使用 spring。
更新
我将主题 cnp_multi_partition_test_topic
与 3 个分区一起使用。
我当前尝试从消费者动态管理分区的代码如下所示:
@Slf4j
public class SampleKafkaConsumer {
@KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
Object sessionId = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId, data);
}
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {
private final KafkaListenerEndpointRegistry registry;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final UUIDProvider uuidProvider;
private ConcurrentMessageListenerContainer<String, String> container;
public void assignPartitions(List<Integer> partitions) {
if(container != null) {
container.stop();
container = null;
}
if(partitions.isEmpty()) {
return;
}
var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
container =
factory.createContainer(newTopicPartitionOffsets);
container.getContainerProperties().setMessageListener(
registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
// random group
container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
container.setConcurrency(1);
container.start();
}
private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
return partitions.stream()
.map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
.collect(Collectors.toList())
.toArray(new TopicPartitionOffset[] {});
}
}
两者都是 Spring beans(单例),通过 java 配置管理。
生产者每秒生成 3 条消息并将其发送到测试主题的 3 个分区中。我使用了 kafka UI 工具来确保所有消息确实按预期到达 我使用 @EventListener
和 @Async
使其同时发生。
以下是我如何尝试模拟工作:
@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
@Autowired
MultiPartitionKafkaConsumerManager manager;
@Test
public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
log.info("Starting the test");
sleep(5_000);
log.info("Start listening on partition 0");
manager.assignPartitions(List.of(0));
sleep(10_000);
log.info("Start listening on partition 0,2");
manager.assignPartitions(List.of(0,2));
sleep(10_000);
log.info("Do not listen on partition 0 anymore");
manager.assignPartitions(List.of(2));
sleep(10_000);
log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
manager.assignPartitions(Collections.emptyList());
sleep(10_000);
日志显示如下:
06:34:20.164 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped
所以我确实看到消费者已启动,它甚至尝试在内部轮询记录,但我想我看到 WakeupException 被抛出并被代理“吞没”。我不确定我是否理解为什么会这样?
您无法在运行时更改手动分配。有几种方法可以达到您想要的结果。
您可以在原型bean 中声明侦听器;参见
您可以使用侦听器容器工厂创建一个具有适当主题配置的新容器,并从静态声明的容器中复制侦听器。
如果需要,我可以提供后者的示例。
...
编辑
这是第二种技术的示例...
@SpringBootApplication
public class So69465733Application {
public static void main(String[] args) {
SpringApplication.run(So69465733Application.class, args);
}
@KafkaListener(id = "dummy", topics = "dummy", autoStartup = "false")
void listen(String in) {
System.out.println(in);
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return args -> {
System.out.println("Hit Enter to create a container for topic1, partition0");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container1 =
factory.createContainer(new TopicPartitionOffset("topic1", 0, SeekPosition.END));
container1.getContainerProperties().setMessageListener(
registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
container1.getContainerProperties().setGroupId("topic1-0-group2");
container1.start();
System.out.println("Hit Enter to create a container for topic2, partition0");
System.in.read();
ConcurrentMessageListenerContainer<String, String> container2 =
factory.createContainer(new TopicPartitionOffset("topic2", 0, SeekPosition.END));
container2.getContainerProperties().setMessageListener(
registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
container2.getContainerProperties().setGroupId("topic2-0-group2");
container2.start();
System.in.read();
container1.stop();
container2.stop();
};
}
}
编辑
从命令行生产者向topic1、topic2发送记录后记录。
Hit Enter to create a container for topic1, partition0
ConsumerConfig values:
...
Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622966736
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Subscribed to partition(s): topic1-0
Hit Enter to create a container for topic2, partition0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Seeking to LATEST offset of partition topic1-0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Resetting offset for partition topic1-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
ConsumerConfig values:
...
Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622969071
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Subscribed to partition(s): topic2-0
Hit Enter to stop containers
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Seeking to LATEST offset of partition topic2-0
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Resetting offset for partition topic2-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
record from topic1
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
record from topic2
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
Application shutdown requested.