Kafka Producer回调性能

Kafka Producer Callback performance

我有 Kafka Produce,它将消息发送到 kafka。我在帮助存储过程的 onsucess 和 onFailure 中将消息记录在数据库中。如代码所示,我正在使用异步

  1. 我是否应该将存储库中的 callStoredProcedure 方法标记为同步以避免死锁?我相信不需要同步,因为回调将在单个线程中按顺序执行。

  2. 从下面link

    https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

我应该在其他线程中执行回调吗? 你能分享一下如何在其他线程中执行回调的代码片段吗?像 3 线程中的并行回调

我的代码片段

@Autowired
private Myrepository  myrepository;

public void sendMessageToKafka(List<String> message) {
    
            for (String s : message) {
    
                future = kafkaTemplate.send(topicName, message);
    
                future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                    @Override
                    public void onSuccess(SendResult<String, String> result) {
    
                        System.out.println("Message Sent  " + result.getRecordMetadata().timestamp());
                        
                        myrepository.callStoredProcedure(result,"SUCCESS");
    
                    }
    
                    @Override
                    public void onFailure(Throwable ex) {
    
                        System.out.println(" sending failed  ");
                        myrepository.callStoredProcedure(result,"FAILED");
    
                    }
                });
    
            }

private final ExecutorService exec = Executors.newSingleThreadExecutor();


...

this.exec.submit(() -> myrepository.callStoredProcedure(result,"SUCCESS"));

任务仍将 运行 在单个线程上(但不是 Kafka IO 线程)。

如果它跟不上您的发布速度,您可能需要使用不同的执行器,例如缓存线程池执行器或 Spring 的 ThreadPoolTaskExecutor.