事务性 Kafka 生产者

Transactional Kafka Producer

我正在尝试让我的 kafka 生产者具有事务性。 我正在发送 10 条消息。如果发生任何错误,则不应向 kafka 发送任何消息,即 none 或全部。

我正在使用 Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}

我发送了 10 条如下文档中所述的消息。应该发送 9 条消息,我的消息大小超过 1MB,由于 RecordTooLargeException

而被 Kafka 代理拒绝

https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

但是当我看到主题时 t_hello_world 有 9 条消息。我的期望是看到 0 条消息,因为我的生产者是事务性的。 我怎样才能实现它?

我收到以下日志

2020-04-30 18:04:36.036 ERROR 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java



Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

2020-04-30 18:04:36.037  WARN 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038  INFO 18688 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038  INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**

未提交的记录被写入日志;当事务提交或回滚时,会在日志中写入一条额外的记录,其中包含事务的状态。

默认情况下,消费者会看到所有记录,包括未提交的记录(但不包括特殊的 commit/abort 记录)。

对于控制台消费者,您需要将隔离级别设置为read_committed。查看帮助:

--isolation-level <String>           Set to read_committed in order to      
                                       filter out transactional messages    
                                       which are not committed. Set to      
                                       read_uncommitted to read all          
                                       messages. (default: read_uncommitted)

如果我在 yml 文件中提供以下配置,我是否需要创建示例代码中给出的工厂、模板和 tx bean?

对于给定的 tx 示例,如果我使用简单的 Consumer(java 代码)或 Kafka 工具,我将能够查看上述 Tx 示例的任何记录 - 希望完全不会 - 我是否按照 Tx 示例正确.

spring:
  profiles: local
  kafka:
    producer:
      client-id: book-event-producer-client
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      **transaction-id-prefix: tx-${random.uuid}**
      properties:
        **enable.idempotence: true**
        **acks: all**
        retries: 2        
        metadata.max.idle.ms: 10000