尽管事务回滚,Kafka Transaction Manager 还是发送给 Kafka Broker

Kafka Transaction Manager sends to Kafka Broker despite transaction rolling back

尽管交易失败,我的 Kafka Producer 仍继续向 Kafka Broker 发送数据。我有一个自定义侦听器,即我没有使用 @KafkaListener 注释。这是 Spring-kafka 2 上的 运行。2.x

尽管 KafkaTransactionManager 回滚,但为什么消息最终在 Kafka 中有任何想法?下面是我的设置:

// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
    ProducerRecord<String, Object> record = new ProducerRecord(topic, key, payload);
    template.executeInTransaction(kt -> kt.send(record));
}

// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
    template.convertAndSend(topic, header, payload);
}

// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
       @Qualifier(value = "transactionalKafkaProducer") ProducerFactory<String, Object> producerFactory,
       @Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
       @Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
   KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
   return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}


// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);



// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer){
    
    try {
            RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());

            retryTemplate.execute(context -> {
                saveToDb() // This rolls back
                sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
                sendToRabbitmqWithTransaction(topic, payload);  // This rolls back
                throw new Exception("Out of Anger");
                return null;
            }, recoveryCallBack, retryState);

            acknowledgment.acknowledge();
      }
      catch (ListenerExecutionFailedException e) {
         throw e;
      }
}    

// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager       : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(104745239<open>)] rollback-only

编辑: 添加 spring 引导配置:

spring.kafka:
  admin:
    bootstrap-servers: ${kakfa.host}
  consumer:
    group-id: test-consumers
    client-id: test-consumers
    auto-offset-reset: latest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    properties:
        isolation-level: read_committed
  producer:
    client-id: test-producer
    acks: all
    retries: 3
    transaction-id-prefix: test-producer-tx-
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
      isolation-level: read_committed

编辑 更多日志

[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata        : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(23309560<open>)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda37/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver  : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(23309560<open>)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

这就是 Kafka 事务的工作方式。已发布的记录始终写入日志,后跟指示事务是提交还是回滚的标记记录。

为避免看到回滚记录,您必须将消费者 isolation.level 属性 设置为 read_committed(默认为 read_uncommitted)。

编辑

这是因为您正在开始新的交易:

template.executeInTransaction(kt -> kt.send(record));
/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
@Nullable
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

只需调用template.send(),模板将参与容器启动的事务。

您也可以从该方法中删除 @Transactional

EDIT2

这符合我的预期...

spring.kafka.producer.transaction-id-prefix=tx-

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

logging.level.org.springframework.transaction=trace
logging.level.org.springframework.kafka.core=trace
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {

    public static void main(String[] args) {
        SpringApplication.run(So66306109Application.class, args);
    }

    @Autowired
    Foo foo;

    @Transactional
    @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
    public void listen(String in) {
        System.out.println(in);
        this.foo.send(in.toUpperCase());
        throw new RuntimeException("test");
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
    }


    @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
    public void listen2(String in) {
        System.out.println(in);
    }

}

@Component
class Foo {

    @Autowired
    KafkaTemplate<String, String> template;

    @Transactional // Not needed - we're already in a transaction
    void send(String in) {
        this.template.send("so66306109-2", in);
    }

}

EDIT3

如果无法升级到受支持的版本,则需要禁用容器中的事务,并在重试执行范围内在代码中自行管理它。

这是一个例子。

@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {

    public static void main(String[] args) {
        SpringApplication.run(So66306109Application.class, args);
    }

    @Autowired
    Foo foo;

    @Autowired
    RetryTemplate template;

    @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
    public void listen(ConsumerRecord<String, String> in) {
        this.template.execute(context -> {
            System.out.println(in);
            this.foo.send(in);
            return null;
        }, context -> {
            System.out.println("RETRIES EXHAUSTED");
            return null;
        });
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
    public void listen2(String in) {
        System.out.println(in);
    }

    @Bean
    ChainedKafkaTransactionManager<String, String> chainedTm(KafkaTransactionManager<String, String> ktm,
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        // transactions can't be started by the container
        factory.getContainerProperties().setTransactionManager(null);
        return new ChainedKafkaTransactionManager<>(ktm);
    }

    @Bean
    public RetryTemplate template() {
        return new RetryTemplate();
    }

}

@Component
class Foo {

    @Autowired
    KafkaTemplate<String, String> template;

    @Autowired
    ProducerFactory<String, String> pf;

    @Transactional("chainedTm")
    public void send(ConsumerRecord<String,String> in) {
        // updateDB
        this.template.send(new ProducerRecord<String, String>("so66306109-2", null, null, in.value().toUpperCase()));
        this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
                new OffsetAndMetadata(in.offset() + 1)));

        // simulate a DB rollback
        KafkaResourceHolder<String, String> resource = (KafkaResourceHolder<String, String>) TransactionSynchronizationManager
                .getResource(this.pf);
        resource.setRollbackOnly();
    }

}

注意;您不得手动确认此类记录;相反,在事务提交之前将偏移量发送到事务。