一个 Kafka Producer Class 可以有多个 @EventListener 方法吗?
Can one Kafka Producer Class have multiple @EventListener methods?
我正在使用 Spring Kafka 并编写了 Producer Class
@Component
@RequiredArgsConstructor
class Producer {
private static final String TOPIC = "channels";
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@EventListener(ApplicationStartedEvent.class)
public void channels_01() throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_02()throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_03()throws IOException {
}
}
是否可以运行 3 个@Eventlistener 注解方法同时作为生产者?
3 种方法将记录发送到完全相同的主题。 spring 容器和 kafka 服务器会将它们识别为 3 个独立的生产者客户端吗?
如果你想要三个生产者客户端,那么你需要三个 KafkaTemplate
并使用 DefaultKafkaProducerFactory
:
的 producerPerThread = true
选项
/**
* Set to true to create a producer per thread instead of singleton that is shared by
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
* physically close the producer when it is no longer needed. These producers will not
* be closed by {@link #destroy()} or {@link #reset()}.
* @param producerPerThread true for a producer per thread.
* @since 2.3
* @see #closeThreadBoundProducer()
*/
public void setProducerPerThread(boolean producerPerThread) {
那么你需要保证ApplicationEventMulticaster
是异步的。见 SimpleApplicationEventMulticaster
:
/**
* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
* to invoke each listener with.
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
* executing all listeners synchronously in the calling thread.
* <p>Consider specifying an asynchronous task executor here to not block the
* caller until all listeners have been executed. However, note that asynchronous
* execution will not participate in the caller's thread context (class loader,
* transaction association) unless the TaskExecutor explicitly supports this.
* @see org.springframework.core.task.SyncTaskExecutor
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
*/
public void setTaskExecutor(@Nullable Executor taskExecutor) {
为它注册一个名称为 applicationEventMulticaster
的 bean 并设置所需的 TaskExecutor
以确保您的 @EventListener
方法被并行调用。
我正在使用 Spring Kafka 并编写了 Producer Class
@Component
@RequiredArgsConstructor
class Producer {
private static final String TOPIC = "channels";
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@EventListener(ApplicationStartedEvent.class)
public void channels_01() throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_02()throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_03()throws IOException {
}
}
是否可以运行 3 个@Eventlistener 注解方法同时作为生产者? 3 种方法将记录发送到完全相同的主题。 spring 容器和 kafka 服务器会将它们识别为 3 个独立的生产者客户端吗?
如果你想要三个生产者客户端,那么你需要三个 KafkaTemplate
并使用 DefaultKafkaProducerFactory
:
producerPerThread = true
选项
/**
* Set to true to create a producer per thread instead of singleton that is shared by
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
* physically close the producer when it is no longer needed. These producers will not
* be closed by {@link #destroy()} or {@link #reset()}.
* @param producerPerThread true for a producer per thread.
* @since 2.3
* @see #closeThreadBoundProducer()
*/
public void setProducerPerThread(boolean producerPerThread) {
那么你需要保证ApplicationEventMulticaster
是异步的。见 SimpleApplicationEventMulticaster
:
/**
* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
* to invoke each listener with.
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
* executing all listeners synchronously in the calling thread.
* <p>Consider specifying an asynchronous task executor here to not block the
* caller until all listeners have been executed. However, note that asynchronous
* execution will not participate in the caller's thread context (class loader,
* transaction association) unless the TaskExecutor explicitly supports this.
* @see org.springframework.core.task.SyncTaskExecutor
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
*/
public void setTaskExecutor(@Nullable Executor taskExecutor) {
为它注册一个名称为 applicationEventMulticaster
的 bean 并设置所需的 TaskExecutor
以确保您的 @EventListener
方法被并行调用。