执行方法 Kafka 回调

Execute a method Kafka Callback

我有一个 RestController,它调用 KafkaSendMethod() 向 Kafka MQ 发送消息

 @RestController
@RequestMapping("string-rest")
public class SimpleBookRestController {

   @Autowire
    private KafkaSendClass kafkaSendClass;
    
    @GetMapping( produces = "application/json")
    public void postMethod() {
    
    ArrayList<String> gfg = new ArrayList<String>(); 

    gfg.add("a"); 
    gfg.add("b"); 
    gfg.add("c");
    
    kafkaSendClass.KafkaSendMethod(gfg);
    
    }

这是向 Kafka 发送消息的class

@Service
    class KafkaSendClass
    {
    
    @Autowired
private KafkaTemplate<String, String> kafkaTemplate;


    void KafkaSendMethod(List<Strings> strList)
    {
    
    for ( String str : strList )
    (
    ListenableFuture<SendResult<String,String>>   future = kafkaTemplate.send(str , str );
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
        @Override
        public void onSuccess(SendResult<String, String> result) {
            syso("sent success");
            m1(); //call only once for one call to KafkaSendMethod() ;
        }
    
        @Override
        public void onFailure(Throwable ex) {
            System.out.println(" sending failed");
        }
    });

); 所以我的问题是每次调用 KafkaSendMethod(List strList) 调用时如何调用 m1() 一次。

看看你是否可以使用 CompletableFuture.allOf(...)

将那些 Future 收集到一个列表中并使用它们的 completable() 改编。然后为 CompletableFuture 调用 List.toArray()CompletableToListenableFutureAdapter 添加您的最终回调。