即使未调用生产者提交,Kafka 事务生产者也会写入消息

Kafka transactional producer writes messages even if producer commit in not invoked

对我来说,kafka 事务生产者的行为似乎像一个普通的生产者,在为每条消息调用发送时,主题上的消息是可见的。也许我缺少一些基本的东西。我原以为只有在调用生产者提交方法后消息才会出现在主题中。在我下面的代码中,produce.commitTransactions() 被注释掉了,但我仍然收到主题中的消息。感谢指点。

  public static void main(String[] args) {
        try {
            Properties producerConfig = new Properties();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
            producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
            producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<>(producerConfig);

            producer.initTransactions(); //initiate transactions
            try {
                producer.beginTransaction(); //begin transactions
                for (Integer i = 0; i < 1000; i++) {
                    producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));

                }
                // producer.commitTransaction(); //commit

            } catch (KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }

            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }

当涉及到 Kafka 中的事务时,您需要考虑 Producer/Consumer 对。正如您所观察到的,Producer 本身只是在生产数据并提交或不提交事务。

只有在与消费者互动时,您才能通过将 KafkaConsumer 配置 isolation.level 设置为 read_committed(默认设置为 read_uncommitted)来“完成”交易。此配置描述为:

isolation.level: Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return transactional messages which have been committed. If set to read_uncommitted' (the default), consumer.poll() will return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned unconditionally in either mode.