Kafka 事务生产者——read_committed 显示记录,尽管中止

Kafka transactional producer — read_committed shows the records despite abort

我写了这个简单的程序来测试 Kafka 中的新事务生产者:

package test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


class kafkatest {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer(props);

        producer.initTransactions();
        producer.beginTransaction();

        producer.send(new ProducerRecord<>("topic", "hello", "world"));  
        producer.flush();

        producer.abortTransaction();

        producer.close();
    }
}

但是当我使用 isolation.level=read_committed 消费时,那条记录就出现了:

--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic topic \ 
        --from-beginning \ 
        --consumer-property isolation.level=read_committed

world

我错过了什么?

要将 read_committed 与控制台消费者一起使用,您需要指定 --isolation-level 选项:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic --from-beginning --isolation-level=read_committed

此选项默认为 read_uncommitted 并覆盖您通过 --consumer-property 传递的值。