在 Kafka Send() 方法的 Onfailure() 方法调用中获取对象值

Get Objects value in Onfailure() method call of Kafka Send() method

我想获取那些没有发送到 Kafka 的 Person 对象,即 onFailure() 方法。

1) 我创建了 person 类型的临时数组并将其传递给 onFailure() 。但这并不 work.It 总是显示相同的 temp[0] work.It 总是打印最后一个值在列表中。 2) 我试图创建本地 Person 对象来解决这个问题。但是我得到了编译错误

"Local variable pp defined in an enclosing scope must be final or effectively final"

我该如何解决这个问题?

class Sender {

        @Autowired
        private KafkaTemplate<String, Person> template;

        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);



        Person temp= null;

        // @Transactional("ktm")
        public void sendThem(List<Person> toSend) throws InterruptedException {
            List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();


            ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {

                @Override
                public void onSuccess(SendResult<String, Person> result) {
                    //LOG.info(" message success 1: " + result.getProducerRecord().value());
                    LOG.info(" message success 2: " + temp);
                }

                @Override
                public void onFailure(Throwable ex) {
                    LOG.info(" message failed : "  + temp);

                }
            };

            for (Person p : toSend) {

                temp=p;
                ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);


                future.addCallback(callback);
            }



        }
    }

失败的发送值在 Throwable 中可用 - 将其转换为 KafkaProducerException。您的 PersonproducerRecord.value().

我更新了对 的回答。

您也可以在循环中创建一个新的回调,然后 p 将对您可用;但使用单个回调并从异常中获取 Person 更有效。