Kafka 事务生产者——read_committed 显示记录,尽管中止
Kafka transactional producer — read_committed shows the records despite abort
我写了这个简单的程序来测试 Kafka 中的新事务生产者:
package test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
class kafkatest {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "hello", "world"));
producer.flush();
producer.abortTransaction();
producer.close();
}
}
但是当我使用 isolation.level=read_committed
消费时,那条记录就出现了:
--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
--topic topic \
--from-beginning \
--consumer-property isolation.level=read_committed
world
我错过了什么?
要将 read_committed
与控制台消费者一起使用,您需要指定 --isolation-level
选项:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic --from-beginning --isolation-level=read_committed
此选项默认为 read_uncommitted
并覆盖您通过 --consumer-property
传递的值。
我写了这个简单的程序来测试 Kafka 中的新事务生产者:
package test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
class kafkatest {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "hello", "world"));
producer.flush();
producer.abortTransaction();
producer.close();
}
}
但是当我使用 isolation.level=read_committed
消费时,那条记录就出现了:
--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
--topic topic \
--from-beginning \
--consumer-property isolation.level=read_committed
world
我错过了什么?
要将 read_committed
与控制台消费者一起使用,您需要指定 --isolation-level
选项:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic --from-beginning --isolation-level=read_committed
此选项默认为 read_uncommitted
并覆盖您通过 --consumer-property
传递的值。