如何为并发消费者获取线程本地?
How to get threadlocal for concurrency consumer?
我正在开发 spring kafka 消费者。由于消息量大,我需要使用并发来确保吞吐量。由于使用并发性,我使用 threadlocal 对象来保存基于线程的数据。现在我需要在使用后删除这个 threadlocal 对象。
Spring 带有以下链接的文档建议实现一个监听事件 ConsumerStoppedEvent 的 EventListener。但是没有提到任何示例事件监听器代码来获取线程本地对象并删除该值。您能否让我知道在这种情况下如何获取 threadlocal 实例?
代码示例将不胜感激。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#thread-safety
像这样:
@SpringBootApplication
public class So71884752Application {
public static void main(String[] args) {
SpringApplication.run(So71884752Application.class, args);
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic1").partitions(2).build();
}
@Component
static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {
private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();
@KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
public void listen() {
long id = Thread.currentThread().getId();
System.out.println("set thread id to ThreadLocal: " + id);
threadLocalState.set(id);
}
@Override
public void onApplicationEvent(ConsumerStoppedEvent event) {
System.out.println("Remove from ThreadLocal: " + threadLocalState.get());
threadLocalState.remove();
}
}
}
因此,对于主题中的这两个分区,我有两个并发侦听器容器。无论如何,他们每个人都会将此称为我的 @KafkaListener
方法。我将线程 ID 存储到 ThreadLocal
中。对于简单 use-case 和测试功能。
我实现了 ApplicationListener<ConsumerStoppedEvent>
,它在适当的消费者线程中发出。那个帮助我提取 ThreadLocal
价值并在消费者生命结束时清理它。
针对嵌入式 Kafka 的测试如下所示:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class So71884752ApplicationTests {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
void contextLoads() throws InterruptedException {
this.kafkaTemplate.send("topic1", "1", "foo");
this.kafkaTemplate.send("topic1", "2", "bar");
this.kafkaTemplate.flush();
Thread.sleep(1000); // Give it a chance to consume data
this.kafkaListenerEndpointRegistry.stop();
}
}
没错。它不验证任何内容,但它演示了该事件是如何发生的。
我在日志输出中看到类似这样的内容:
set thread id to ThreadLocal: 125
set thread id to ThreadLocal: 127
...
Remove from ThreadLocal: 125
Remove from ThreadLocal: 127
所以,无论医生说什么都是正确的。
我正在开发 spring kafka 消费者。由于消息量大,我需要使用并发来确保吞吐量。由于使用并发性,我使用 threadlocal 对象来保存基于线程的数据。现在我需要在使用后删除这个 threadlocal 对象。 Spring 带有以下链接的文档建议实现一个监听事件 ConsumerStoppedEvent 的 EventListener。但是没有提到任何示例事件监听器代码来获取线程本地对象并删除该值。您能否让我知道在这种情况下如何获取 threadlocal 实例? 代码示例将不胜感激。 https://docs.spring.io/spring-kafka/docs/current/reference/html/#thread-safety
像这样:
@SpringBootApplication
public class So71884752Application {
public static void main(String[] args) {
SpringApplication.run(So71884752Application.class, args);
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic1").partitions(2).build();
}
@Component
static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {
private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();
@KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
public void listen() {
long id = Thread.currentThread().getId();
System.out.println("set thread id to ThreadLocal: " + id);
threadLocalState.set(id);
}
@Override
public void onApplicationEvent(ConsumerStoppedEvent event) {
System.out.println("Remove from ThreadLocal: " + threadLocalState.get());
threadLocalState.remove();
}
}
}
因此,对于主题中的这两个分区,我有两个并发侦听器容器。无论如何,他们每个人都会将此称为我的 @KafkaListener
方法。我将线程 ID 存储到 ThreadLocal
中。对于简单 use-case 和测试功能。
我实现了 ApplicationListener<ConsumerStoppedEvent>
,它在适当的消费者线程中发出。那个帮助我提取 ThreadLocal
价值并在消费者生命结束时清理它。
针对嵌入式 Kafka 的测试如下所示:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class So71884752ApplicationTests {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
void contextLoads() throws InterruptedException {
this.kafkaTemplate.send("topic1", "1", "foo");
this.kafkaTemplate.send("topic1", "2", "bar");
this.kafkaTemplate.flush();
Thread.sleep(1000); // Give it a chance to consume data
this.kafkaListenerEndpointRegistry.stop();
}
}
没错。它不验证任何内容,但它演示了该事件是如何发生的。
我在日志输出中看到类似这样的内容:
set thread id to ThreadLocal: 125
set thread id to ThreadLocal: 127
...
Remove from ThreadLocal: 125
Remove from ThreadLocal: 127
所以,无论医生说什么都是正确的。