一个 Kafka Producer Class 可以有多个 @EventListener 方法吗?

Can one Kafka Producer Class have multiple @EventListener methods?

我正在使用 Spring Kafka 并编写了 Producer Class

@Component
@RequiredArgsConstructor
class Producer {

    private static final String TOPIC = "channels";
    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @EventListener(ApplicationStartedEvent.class)
    public void channels_01() throws IOException {

    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_02()throws IOException {
        
    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_03()throws IOException {
        
    }
}

是否可以运行 3 个@Eventlistener 注解方法同时作为生产者? 3 种方法将记录发送到完全相同的主题。 spring 容器和 kafka 服务器会将它们识别为 3 个独立的生产者客户端吗?

如果你想要三个生产者客户端,那么你需要三个 KafkaTemplate 并使用 DefaultKafkaProducerFactory:

producerPerThread = true 选项
/**
 * Set to true to create a producer per thread instead of singleton that is shared by
 * all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
 * physically close the producer when it is no longer needed. These producers will not
 * be closed by {@link #destroy()} or {@link #reset()}.
 * @param producerPerThread true for a producer per thread.
 * @since 2.3
 * @see #closeThreadBoundProducer()
 */
public void setProducerPerThread(boolean producerPerThread) {

那么你需要保证ApplicationEventMulticaster是异步的。见 SimpleApplicationEventMulticaster:

/**
 * Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
 * to invoke each listener with.
 * <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
 * executing all listeners synchronously in the calling thread.
 * <p>Consider specifying an asynchronous task executor here to not block the
 * caller until all listeners have been executed. However, note that asynchronous
 * execution will not participate in the caller's thread context (class loader,
 * transaction association) unless the TaskExecutor explicitly supports this.
 * @see org.springframework.core.task.SyncTaskExecutor
 * @see org.springframework.core.task.SimpleAsyncTaskExecutor
 */
public void setTaskExecutor(@Nullable Executor taskExecutor) {

为它注册一个名称为 applicationEventMulticaster 的 bean 并设置所需的 TaskExecutor 以确保您的 @EventListener 方法被并行调用。