Spring Cloud Stream 多主题事务管理
Spring Cloud Stream Multi Topic Transaction Management
我正在尝试在 Java 中创建一个 PoC 应用程序,以了解在使用 Kafka 进行消息发布时如何在 Spring Cloud Stream 中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后它会进行一些处理并生成两条发往两个不同主题的新消息。我希望能够将这两条消息作为单个事务来处理。因此,如果发布第二条消息失败,我想滚动(不提交)第一条消息。 Spring Cloud Stream 是否支持这样的用例?
我已经设置了 @Transactional
注释,我可以看到一个全局事务在消息传递给消费者之前就开始了。但是,当我尝试通过 MessageChannel.send()
方法发布消息时,我可以看到在 KafkaProducerMessageHandler
class' handleRequestMessage()
方法中启动并完成了一个新的本地事务。即消息的发送不参与全局事务。因此,如果在发布第一条消息后抛出异常,则不会回滚该消息。全局事务被回滚,但实际上并没有做任何事情,因为第一条消息已经提交。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer: # these apply to all producers that participate in the transaction
partition-key-extractor-name: partitionKeyExtractorStrategy
partition-selector-name: partitionSelectorStrategy
partition-count: 3
configuration:
acks: all
enable:
idempotence: true
retries: 10
bindings:
input-customer-data-change-topic:
consumer:
configuration:
isolation:
level: read_committed
enable-dlq: true
bindings:
input-customer-data-change-topic:
content-type: application/json
destination: com.fis.customer
group: com.fis.ec
consumer:
partitioned: true
max-attempts: 1
output-name-change-topic:
content-type: application/json
destination: com.fis.customer.name
output-email-change-topic:
content-type: application/json
destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
public static void main(final String[] args)
{
SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
}
}
public interface CustomerDataChangeStreams
{
@Input("input-customer-data-change-topic")
SubscribableChannel inputCustomerDataChange();
@Output("output-email-change-topic")
MessageChannel outputEmailDataChange();
@Output("output-name-change-topic")
MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
@Autowired
private CustomerDataChangeProcessor mService;
@StreamListener("input-customer-data-change-topic")
public Message<String> handleCustomerDataChangeMessages(
@Payload final ImmutableCustomerDetails customerDetails)
{
return mService.processMessage(customerDetails);
}
}
@Component
public class CustomerDataChangeProcessor
{
private final CustomerDataChangeStreams mStreams;
@Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
private String mEmailChangeTopic;
@Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
private String mNameChangeTopic;
public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
{
mStreams = streams;
}
public void processMessage(final CustomerDetails customerDetails)
{
try
{
sendNameMessage(customerDetails);
sendEmailMessage(customerDetails);
}
catch (final JSONException ex)
{
LOGGER.error("Failed to send messages.", ex);
}
}
public void sendNameMessage(final CustomerDetails customerDetails)
throws JSONException
{
final JSONObject nameChangeDetails = new JSONObject();
nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
final String action = customerDetails.action();
nameChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());
if ("fail_name_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("Customer name failure!");
}
}
public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
{
final JSONObject emailChangeDetails = new JSONObject();
emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
final String action = customerDetails.action();
emailChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());
if ("fail_email_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("E-mail address failure!");
}
}
}
编辑
我们越来越近了。不再创建本地事务。但是,即使出现异常,全局事务仍然会被提交。据我所知,异常不会传播到 TransactionTemplate.execute()
方法。因此,事务被提交。 sendMessage()
方法 "swallows" 中的 MessageProducerSupport
class 似乎是 catch 子句中的异常。如果定义了错误通道,则会向其发布一条消息,因此不会重新抛出异常。我尝试关闭错误通道 (spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled = false
),但这并没有将其关闭。因此,为了进行测试,我只是在调试器中将错误通道设置为 null 以强制重新抛出异常。这似乎做到了。但是,即使我为该消费者将 max-attempts
设置为 1,原始消息仍会不断重新传递给初始消费者。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
Default null (no transactions)
spring.cloud.stream.kafka.binder.transaction.producer.*
Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.
Default: See individual producer properties.
您必须配置共享的全局生产者。
不要添加@Transactional
- 容器将启动事务并在提交事务之前将偏移量发送到事务。
如果侦听器抛出异常,事务将回滚,DefaultAfterRollbackPostProcessor
将重新寻找 topics/partitions,以便重新传送记录。
编辑
活页夹的事务管理器配置中存在错误,导致输出绑定启动新的本地事务。
要解决此问题,请使用以下容器定制器 bean 重新配置 TM...
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
.getTransactionManager();
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
};
}
EDIT2
您不能使用活页夹的 DLQ 支持,因为从容器的角度来看,交付已成功。我们需要将异常传播到容器以强制回滚。因此,您需要将死信移至 AfterRollbackProcessor
。这是我的完整测试 class:
@SpringBootApplication
@EnableBinding(Processor.class)
public class So57379575Application {
public static void main(String[] args) {
SpringApplication.run(So57379575Application.class, args);
}
@Autowired
private MessageChannel output;
@StreamListener(Processor.INPUT)
public void listen(String in) {
System.out.println("in:" + in);
this.output.send(new GenericMessage<>(in.toUpperCase()));
if (in.equals("two")) {
throw new RuntimeException("fail");
}
}
@KafkaListener(id = "so57379575", topics = "so57379575out")
public void listen2(String in) {
System.out.println("out:" + in);
}
@KafkaListener(id = "so57379575DLT", topics = "so57379575dlt")
public void listen3(String in) {
System.out.println("dlt:" + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("so57379575in", "one".getBytes());
template.send("so57379575in", "two".getBytes());
};
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
KafkaTemplate<Object, Object> template) {
return (container, dest, group) -> {
// enable transaction synchronization
KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
.getTransactionManager();
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
// container dead-lettering
DefaultAfterRollbackProcessor<? super byte[], ? super byte[]> afterRollbackProcessor =
new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template,
(ex, tp) -> new TopicPartition("so57379575dlt", -1)), 0);
container.setAfterRollbackProcessor(afterRollbackProcessor);
};
}
}
和
spring:
kafka:
bootstrap-servers:
- 10.0.0.8:9092
- 10.0.0.8:9093
- 10.0.0.8:9094
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
properties:
isolation.level: read_committed
cloud:
stream:
bindings:
input:
destination: so57379575in
group: so57379575in
consumer:
max-attempts: 1
output:
destination: so57379575out
kafka:
binder:
transaction:
transaction-id-prefix: so57379575tx.
producer:
configuration:
acks: all
retries: 10
#logging:
# level:
# org.springframework.kafka: trace
# org.springframework.transaction: trace
和
in:two
2019-08-07 12:43:33.457 ERROR 36532 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while
...
Caused by: java.lang.RuntimeException: fail
...
in:one
dlt:two
out:ONE
我正在尝试在 Java 中创建一个 PoC 应用程序,以了解在使用 Kafka 进行消息发布时如何在 Spring Cloud Stream 中进行事务管理。我试图模拟的用例是一个接收消息的处理器。然后它会进行一些处理并生成两条发往两个不同主题的新消息。我希望能够将这两条消息作为单个事务来处理。因此,如果发布第二条消息失败,我想滚动(不提交)第一条消息。 Spring Cloud Stream 是否支持这样的用例?
我已经设置了 @Transactional
注释,我可以看到一个全局事务在消息传递给消费者之前就开始了。但是,当我尝试通过 MessageChannel.send()
方法发布消息时,我可以看到在 KafkaProducerMessageHandler
class' handleRequestMessage()
方法中启动并完成了一个新的本地事务。即消息的发送不参与全局事务。因此,如果在发布第一条消息后抛出异常,则不会回滚该消息。全局事务被回滚,但实际上并没有做任何事情,因为第一条消息已经提交。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer: # these apply to all producers that participate in the transaction
partition-key-extractor-name: partitionKeyExtractorStrategy
partition-selector-name: partitionSelectorStrategy
partition-count: 3
configuration:
acks: all
enable:
idempotence: true
retries: 10
bindings:
input-customer-data-change-topic:
consumer:
configuration:
isolation:
level: read_committed
enable-dlq: true
bindings:
input-customer-data-change-topic:
content-type: application/json
destination: com.fis.customer
group: com.fis.ec
consumer:
partitioned: true
max-attempts: 1
output-name-change-topic:
content-type: application/json
destination: com.fis.customer.name
output-email-change-topic:
content-type: application/json
destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
public static void main(final String[] args)
{
SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
}
}
public interface CustomerDataChangeStreams
{
@Input("input-customer-data-change-topic")
SubscribableChannel inputCustomerDataChange();
@Output("output-email-change-topic")
MessageChannel outputEmailDataChange();
@Output("output-name-change-topic")
MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
@Autowired
private CustomerDataChangeProcessor mService;
@StreamListener("input-customer-data-change-topic")
public Message<String> handleCustomerDataChangeMessages(
@Payload final ImmutableCustomerDetails customerDetails)
{
return mService.processMessage(customerDetails);
}
}
@Component
public class CustomerDataChangeProcessor
{
private final CustomerDataChangeStreams mStreams;
@Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
private String mEmailChangeTopic;
@Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
private String mNameChangeTopic;
public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
{
mStreams = streams;
}
public void processMessage(final CustomerDetails customerDetails)
{
try
{
sendNameMessage(customerDetails);
sendEmailMessage(customerDetails);
}
catch (final JSONException ex)
{
LOGGER.error("Failed to send messages.", ex);
}
}
public void sendNameMessage(final CustomerDetails customerDetails)
throws JSONException
{
final JSONObject nameChangeDetails = new JSONObject();
nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
final String action = customerDetails.action();
nameChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());
if ("fail_name_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("Customer name failure!");
}
}
public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
{
final JSONObject emailChangeDetails = new JSONObject();
emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
final String action = customerDetails.action();
emailChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());
if ("fail_email_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("E-mail address failure!");
}
}
}
编辑
我们越来越近了。不再创建本地事务。但是,即使出现异常,全局事务仍然会被提交。据我所知,异常不会传播到 TransactionTemplate.execute()
方法。因此,事务被提交。 sendMessage()
方法 "swallows" 中的 MessageProducerSupport
class 似乎是 catch 子句中的异常。如果定义了错误通道,则会向其发布一条消息,因此不会重新抛出异常。我尝试关闭错误通道 (spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled = false
),但这并没有将其关闭。因此,为了进行测试,我只是在调试器中将错误通道设置为 null 以强制重新抛出异常。这似乎做到了。但是,即使我为该消费者将 max-attempts
设置为 1,原始消息仍会不断重新传递给初始消费者。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
Default null (no transactions)
spring.cloud.stream.kafka.binder.transaction.producer.*
Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.
Default: See individual producer properties.
您必须配置共享的全局生产者。
不要添加@Transactional
- 容器将启动事务并在提交事务之前将偏移量发送到事务。
如果侦听器抛出异常,事务将回滚,DefaultAfterRollbackPostProcessor
将重新寻找 topics/partitions,以便重新传送记录。
编辑
活页夹的事务管理器配置中存在错误,导致输出绑定启动新的本地事务。
要解决此问题,请使用以下容器定制器 bean 重新配置 TM...
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, dest, group) -> {
KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
.getTransactionManager();
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
};
}
EDIT2
您不能使用活页夹的 DLQ 支持,因为从容器的角度来看,交付已成功。我们需要将异常传播到容器以强制回滚。因此,您需要将死信移至 AfterRollbackProcessor
。这是我的完整测试 class:
@SpringBootApplication
@EnableBinding(Processor.class)
public class So57379575Application {
public static void main(String[] args) {
SpringApplication.run(So57379575Application.class, args);
}
@Autowired
private MessageChannel output;
@StreamListener(Processor.INPUT)
public void listen(String in) {
System.out.println("in:" + in);
this.output.send(new GenericMessage<>(in.toUpperCase()));
if (in.equals("two")) {
throw new RuntimeException("fail");
}
}
@KafkaListener(id = "so57379575", topics = "so57379575out")
public void listen2(String in) {
System.out.println("out:" + in);
}
@KafkaListener(id = "so57379575DLT", topics = "so57379575dlt")
public void listen3(String in) {
System.out.println("dlt:" + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("so57379575in", "one".getBytes());
template.send("so57379575in", "two".getBytes());
};
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
KafkaTemplate<Object, Object> template) {
return (container, dest, group) -> {
// enable transaction synchronization
KafkaTransactionManager<?, ?> tm = (KafkaTransactionManager<?, ?>) container.getContainerProperties()
.getTransactionManager();
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
// container dead-lettering
DefaultAfterRollbackProcessor<? super byte[], ? super byte[]> afterRollbackProcessor =
new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(template,
(ex, tp) -> new TopicPartition("so57379575dlt", -1)), 0);
container.setAfterRollbackProcessor(afterRollbackProcessor);
};
}
}
和
spring:
kafka:
bootstrap-servers:
- 10.0.0.8:9092
- 10.0.0.8:9093
- 10.0.0.8:9094
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
properties:
isolation.level: read_committed
cloud:
stream:
bindings:
input:
destination: so57379575in
group: so57379575in
consumer:
max-attempts: 1
output:
destination: so57379575out
kafka:
binder:
transaction:
transaction-id-prefix: so57379575tx.
producer:
configuration:
acks: all
retries: 10
#logging:
# level:
# org.springframework.kafka: trace
# org.springframework.transaction: trace
和
in:two
2019-08-07 12:43:33.457 ERROR 36532 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while
...
Caused by: java.lang.RuntimeException: fail
...
in:one
dlt:two
out:ONE