Kafka中如何使用事务以及如何使用abortTransaction?

How to use transactions in Kafka and how to use abortTransaction?

我是 kafka 的新手,我使用 Kafka Producer Java api。 面对卡夫卡的这个问题, Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION.

人们写道 producer.abortTransaction() 应该只在没有交易进行中时调用.... 知道如何检查飞行中是否有交易吗?以及如何 clear/stop 他们?

这是我的代码:

try { 
  producer.send(record, new Callback() { 
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
      if ( e != null){ 
        logger.info("Record was not sent due to kafka issue");
        throw new KafkaException("Record was not sent due to kafka issue");
      }
    }
  });
} catch (KafkaException e){
  producer.abortTransaction(); 
}

What I need to achieve is detecting when the kafka is stopped and in this case clear all the buffers so the records In these buffers do not appear in the consumer side when the kafka is started again.

在这种情况下,您通常会做的是应用 KafkaProducer 的 Java 文档中描述的交易:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

这样,如果 isolation.level 设置为 read_committed

,则 100 条记录中的全部或 none 对消费者可见

您正在关闭 producer.close() non-recoverable 个异常,例如

  • ProducerFencedException:这个致命异常表明另一个具有相同transactional.id的生产者已经启动。在任何给定时间只能有一个具有 transactional.id 的生产者实例,并且要启动的最新实例“隔离”了先前的实例,以便它们不再发出事务请求。遇到这个异常,必须关闭producer实例。

  • OutOfOrderSequenceException:这个异常表明代理从生产者那里收到了一个意外的序列号,这意味着数据可能已经丢失。如果生产者仅配置为幂等(即,如果设置了 enable.idempotence 并且未配置 transactional.id),则可以继续使用相同的生产者实例发送,但这样做有重新排序已发送记录的风险。对于事务生产者,这是一个致命错误,您应该关闭生产者。

  • AuthorizationException: [self-explanatory]