在 Kafka Send() 方法的 Onfailure() 方法调用中获取对象值
Get Objects value in Onfailure() method call of Kafka Send() method
我想获取那些没有发送到 Kafka 的 Person 对象,即 onFailure() 方法。
1) 我创建了 person 类型的临时数组并将其传递给 onFailure() 。但这并不 work.It 总是显示相同的 temp[0] work.It 总是打印最后一个值在列表中。
2) 我试图创建本地 Person 对象来解决这个问题。但是我得到了编译错误
"Local variable pp defined in an enclosing scope must be final or effectively final"
我该如何解决这个问题?
class Sender {
@Autowired
private KafkaTemplate<String, Person> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
Person temp= null;
// @Transactional("ktm")
public void sendThem(List<Person> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();
ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {
@Override
public void onSuccess(SendResult<String, Person> result) {
//LOG.info(" message success 1: " + result.getProducerRecord().value());
LOG.info(" message success 2: " + temp);
}
@Override
public void onFailure(Throwable ex) {
LOG.info(" message failed : " + temp);
}
};
for (Person p : toSend) {
temp=p;
ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);
future.addCallback(callback);
}
}
}
失败的发送值在 Throwable
中可用 - 将其转换为 KafkaProducerException
。您的 Person
在 producerRecord.value()
.
我更新了对 的回答。
您也可以在循环中创建一个新的回调,然后 p
将对您可用;但使用单个回调并从异常中获取 Person
更有效。
我想获取那些没有发送到 Kafka 的 Person 对象,即 onFailure() 方法。
1) 我创建了 person 类型的临时数组并将其传递给 onFailure() 。但这并不 work.It 总是显示相同的 temp[0] work.It 总是打印最后一个值在列表中。 2) 我试图创建本地 Person 对象来解决这个问题。但是我得到了编译错误
"Local variable pp defined in an enclosing scope must be final or effectively final"
我该如何解决这个问题?
class Sender {
@Autowired
private KafkaTemplate<String, Person> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
Person temp= null;
// @Transactional("ktm")
public void sendThem(List<Person> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();
ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {
@Override
public void onSuccess(SendResult<String, Person> result) {
//LOG.info(" message success 1: " + result.getProducerRecord().value());
LOG.info(" message success 2: " + temp);
}
@Override
public void onFailure(Throwable ex) {
LOG.info(" message failed : " + temp);
}
};
for (Person p : toSend) {
temp=p;
ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);
future.addCallback(callback);
}
}
}
失败的发送值在 Throwable
中可用 - 将其转换为 KafkaProducerException
。您的 Person
在 producerRecord.value()
.
我更新了对
您也可以在循环中创建一个新的回调,然后 p
将对您可用;但使用单个回调并从异常中获取 Person
更有效。