Kafka Transactional read committed 消费者
Kafka Transactional read committed Consumer
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic,如下所示。
事务性 Kafka 生产者的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the maximum amount of time the client will wait "
"for the response of a request. If the response is not received before the timeout "
"elapses the client will resend the request if necessary or fail the request if "
"retries are exhausted.";.*/
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
/*To avoid duplicate msg*/
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
/*Will wait for ack from broker n all replicas*/
props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
return props;
}
@Bean
public KafkaProducer<String, String> kafkaProducer() {
return new KafkaProducer<>(producerConfigs());
}
普通 Producer 配置相同,仅 ProducerConfig.CLIENT_ID_CONFIG 和 ProducerConfig.TRANSACTIONAL_ID_CONFIG 未添加。
消费者配置如下
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
//list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group");
//automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Auto commit is set false.Will do manual commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/*Kafka Transactional Property ->Controls how to read messages written transactionally
* read_committed - poll transactional messages which have been committed only
* read_uncommitted - will return all messages, even transactional messages
* default is read_uncommitted
* */
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
因为我将 isolation.level 设置为 read_committed 所以它应该只使用来自订阅主题的事务性消息。
但是它是否使用来自主题的事务性和非事务性消息。
我是否缺少任何配置,以便消费者只会使用来自订阅主题的事务性消息。
提前致谢:-)
这样不行。 isolation.level
仅适用于交易生产者提交的记录。所有消费者都可以看到非交易生产者发布的记录。
您需要使用两个不同的主题来获得您想要的行为。
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic,如下所示。
事务性 Kafka 生产者的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the maximum amount of time the client will wait "
"for the response of a request. If the response is not received before the timeout "
"elapses the client will resend the request if necessary or fail the request if "
"retries are exhausted.";.*/
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
/*To avoid duplicate msg*/
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
/*Will wait for ack from broker n all replicas*/
props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
return props;
}
@Bean
public KafkaProducer<String, String> kafkaProducer() {
return new KafkaProducer<>(producerConfigs());
}
普通 Producer 配置相同,仅 ProducerConfig.CLIENT_ID_CONFIG 和 ProducerConfig.TRANSACTIONAL_ID_CONFIG 未添加。
消费者配置如下
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
//list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group");
//automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Auto commit is set false.Will do manual commit
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/*Kafka Transactional Property ->Controls how to read messages written transactionally
* read_committed - poll transactional messages which have been committed only
* read_uncommitted - will return all messages, even transactional messages
* default is read_uncommitted
* */
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
因为我将 isolation.level 设置为 read_committed 所以它应该只使用来自订阅主题的事务性消息。 但是它是否使用来自主题的事务性和非事务性消息。 我是否缺少任何配置,以便消费者只会使用来自订阅主题的事务性消息。 提前致谢:-)
这样不行。 isolation.level
仅适用于交易生产者提交的记录。所有消费者都可以看到非交易生产者发布的记录。
您需要使用两个不同的主题来获得您想要的行为。