Spring Cloud Stream 数据库事务不回滚
Spring Cloud Stream database transaction does not roll back
我正在尝试编写一个 spring-cloud-stream 函数(spring-starter-parent 2.5.3,java 11,spring-cloud-版本 2020.0.3)同时具有 Kafka 和 Postgres 事务。每当使用的消息以字符串“fail”开头时,该函数都会引发模拟错误,我希望这会导致数据库事务回滚,然后导致 kafka 事务回滚。 (我知道 Kafka 事务 不是 XA,这很好。)到目前为止,我还没有让数据库事务工作,但 kafka 事务可以工作。
目前我正在使用 @Transactional
注释,它似乎没有启动数据库事务。 (Kafka binder documentation recommends synchronizing database + kafka transactions using the ChainedTransactionManager, but the Spring Kafka documentation states it is deprecated in favor of using the @Transactional
annotation, and the S.C.S. example for this problem 使用 @Transactional
注释和由 start-jpa 库创建的默认事务管理器(我认为))。我可以在我的调试器中看到,无论我是否 @EnableTransactionManagement
并在我的消费者上使用 @Transactional
,消费者都是使用堆栈中更高的事务模板在 kafka 事务中执行的,但我这样做在任何地方都看不到数据库事务。
我有几个问题想了解:
- 无论我是否有
@Transactional
注释,我是否更正了 Kafka 侦听器容器 运行 在 Kafka 事务的上下文中是我的消费者?如果是这样,有没有办法在 Kafka 事务中仅 运行 特定功能?
- 由于容器无法拦截对生产者的调用(据我所知),上述内容是否会改变生产者?
- 我应该怎么做才能同步 Kafka 和数据库事务,以便数据库提交发生在 Kafka 提交之前?
我有以下 Crud 存储库、处理程序集合和 application.yml:
@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {
/**
* Create a new audit log entry if and only if another with the same message does not already
* exist. This is idempotent.
*/
@Transactional
@Modifying
@Query(
nativeQuery = true,
value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
void createIfNotExists(String message);
}
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);
@Bean
public NewTopic inputTopic() {
return TopicBuilder.name("input").partitions(1).replicas(1).build();
}
@Bean
public NewTopic inputDltTopic() {
return TopicBuilder.name("input.DLT").partitions(1).build();
}
@Bean
public NewTopic leftTopic() {
return TopicBuilder.name("left").partitions(1).build();
}
@Bean
public NewTopic rightTopic() {
return TopicBuilder.name("right").partitions(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
LOGGER.info("Producing messages to input...");
template.send("input", "pass-1".getBytes());
template.send("input", "fail-1".getBytes());
template.send("input", "pass-2".getBytes());
template.send("input", "fail-2".getBytes());
LOGGER.info("Produced input.");
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
};
}
// Receive messages from `input`.
// For each input, write an audit log to the database.
// For each input, produce a message to both `left` and `right` atomically.
// After three failed attempts to achieve the above, shuffle the message
// off to `input.DLT` and move on.
@Bean
@Transactional
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository
) {
return input -> {
bridge.send("left", ("left-" + input).getBytes());
repository.createIfNotExists(input);
if (input.startsWith("fail")) {
throw new RuntimeException("Simulated error");
}
bridge.send("right", ("right-" + input).getBytes());
};
}
@Bean
public Consumer<Message<String>> logger() {
return message -> {
var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
};
}
}
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
required-acks: all
bindings:
persistAndSplit-in-0:
destination: input
group: input
logger-in-0:
destination: left,right,input.DLT
group: logger
consumer:
properties:
isolation.level: read_committed
function:
definition: persistAndSplit;logger
谢谢!
@Bean
@Transactional
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository
) {
在这种情况下,@Transactional
在 bean 定义中(在应用程序初始化期间只执行一次);要获得运行时事务,您需要在 lambda 中的代码进行如此注释;比如...
@Bean
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository,
TxCode code
) {
return Txcode:run;
}
@Component
class TxCode {
@Autowired
AuditLogRepository repository
@Autowired
StreamBridge bridge;
@Transactional
void run(String input) {
bridge.send("left", ("left-" + input).getBytes());
repository.createIfNotExists(input);
if (input.startsWith("fail")) {
throw new RuntimeException("Simulated error");
}
bridge.send("right", ("right-" + input).getBytes());
};
}
(或者你也可以通过 bridge 和 repo 进来)。
return str -> code.run(str, repo, bridge);
我正在尝试编写一个 spring-cloud-stream 函数(spring-starter-parent 2.5.3,java 11,spring-cloud-版本 2020.0.3)同时具有 Kafka 和 Postgres 事务。每当使用的消息以字符串“fail”开头时,该函数都会引发模拟错误,我希望这会导致数据库事务回滚,然后导致 kafka 事务回滚。 (我知道 Kafka 事务 不是 XA,这很好。)到目前为止,我还没有让数据库事务工作,但 kafka 事务可以工作。
目前我正在使用 @Transactional
注释,它似乎没有启动数据库事务。 (Kafka binder documentation recommends synchronizing database + kafka transactions using the ChainedTransactionManager, but the Spring Kafka documentation states it is deprecated in favor of using the @Transactional
annotation, and the S.C.S. example for this problem 使用 @Transactional
注释和由 start-jpa 库创建的默认事务管理器(我认为))。我可以在我的调试器中看到,无论我是否 @EnableTransactionManagement
并在我的消费者上使用 @Transactional
,消费者都是使用堆栈中更高的事务模板在 kafka 事务中执行的,但我这样做在任何地方都看不到数据库事务。
我有几个问题想了解:
- 无论我是否有
@Transactional
注释,我是否更正了 Kafka 侦听器容器 运行 在 Kafka 事务的上下文中是我的消费者?如果是这样,有没有办法在 Kafka 事务中仅 运行 特定功能? - 由于容器无法拦截对生产者的调用(据我所知),上述内容是否会改变生产者?
- 我应该怎么做才能同步 Kafka 和数据库事务,以便数据库提交发生在 Kafka 提交之前?
我有以下 Crud 存储库、处理程序集合和 application.yml:
@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {
/**
* Create a new audit log entry if and only if another with the same message does not already
* exist. This is idempotent.
*/
@Transactional
@Modifying
@Query(
nativeQuery = true,
value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
void createIfNotExists(String message);
}
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);
@Bean
public NewTopic inputTopic() {
return TopicBuilder.name("input").partitions(1).replicas(1).build();
}
@Bean
public NewTopic inputDltTopic() {
return TopicBuilder.name("input.DLT").partitions(1).build();
}
@Bean
public NewTopic leftTopic() {
return TopicBuilder.name("left").partitions(1).build();
}
@Bean
public NewTopic rightTopic() {
return TopicBuilder.name("right").partitions(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
LOGGER.info("Producing messages to input...");
template.send("input", "pass-1".getBytes());
template.send("input", "fail-1".getBytes());
template.send("input", "pass-2".getBytes());
template.send("input", "fail-2".getBytes());
LOGGER.info("Produced input.");
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
};
}
// Receive messages from `input`.
// For each input, write an audit log to the database.
// For each input, produce a message to both `left` and `right` atomically.
// After three failed attempts to achieve the above, shuffle the message
// off to `input.DLT` and move on.
@Bean
@Transactional
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository
) {
return input -> {
bridge.send("left", ("left-" + input).getBytes());
repository.createIfNotExists(input);
if (input.startsWith("fail")) {
throw new RuntimeException("Simulated error");
}
bridge.send("right", ("right-" + input).getBytes());
};
}
@Bean
public Consumer<Message<String>> logger() {
return message -> {
var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
};
}
}
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
required-acks: all
bindings:
persistAndSplit-in-0:
destination: input
group: input
logger-in-0:
destination: left,right,input.DLT
group: logger
consumer:
properties:
isolation.level: read_committed
function:
definition: persistAndSplit;logger
谢谢!
@Bean
@Transactional
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository
) {
在这种情况下,@Transactional
在 bean 定义中(在应用程序初始化期间只执行一次);要获得运行时事务,您需要在 lambda 中的代码进行如此注释;比如...
@Bean
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository,
TxCode code
) {
return Txcode:run;
}
@Component
class TxCode {
@Autowired
AuditLogRepository repository
@Autowired
StreamBridge bridge;
@Transactional
void run(String input) {
bridge.send("left", ("left-" + input).getBytes());
repository.createIfNotExists(input);
if (input.startsWith("fail")) {
throw new RuntimeException("Simulated error");
}
bridge.send("right", ("right-" + input).getBytes());
};
}
(或者你也可以通过 bridge 和 repo 进来)。
return str -> code.run(str, repo, bridge);