Kafka 仅流式传输一次
Kafka streams exactly once delivery
我的目标是从主题 A 中消费,进行一些处理并生成主题 B,作为单个原子操作。为此,我看到了两个选择:
- 使用 spring-kafka @Kafkalistener 和 KafkaTemplate here。
- 使用 Streams eos(恰好一次)功能。
我已成功验证选项 #1。成功是指如果我的处理失败(抛出 IllegalArgumentException),来自主题 A 的消费消息将继续被 KafkaListener 消费。这是我所期望的,因为未提交偏移量并且使用了 DefaultAfterRollbackProcessor。
如果我不使用 KafkaListener 而使用流来从主题 A 消费、处理并发送到主题 B(选项 #2),我希望看到相同的行为。但是即使在我处理 IllegalArgumentException 时抛出消息,流也只使用一次。这是预期的行为吗?
在 Streams 案例中,我唯一的配置如下:
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
}
}
//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
streamsBuilderFactoryBean.stop();
log.debug("creating and starting a new StreamsThread ..");
streamsBuilderFactoryBean.start();
}
});
return streamsBuilderFactoryBean;
}
我的直播是这样的:
@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;
@Autowired
private TaxService taxService;
@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {
KStream<String, InvoiceEvents> kStream = builder.stream("A",
Consumed.with(Serdes.String(), eventSerde));
kStream
.mapValues(v ->
{
// get tax from possibly remote service
// an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
int tax = taxService.getTaxForInvoice(v);
// create a TaxCalculated event
InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
return taxCalculatedEvent;
})
.to("B", Produced.with(Serdes.String(), eventSerde));
return kStream;
}
happy path streams 场景有效:如果处理时没有抛出异常,则消息会正确出现在主题 B 中。
我的单元测试:
@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
.thenThrow(new IllegalArgumentException("Tax calculation failed"));
InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");
// produce with key
IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);
// wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
Thread.sleep(30000);
// ...
}
更新:
在我的单元测试中,我发送了 50 个 invoiceEvents (orderId=1,...,50),我处理它们并将它们发送到目标主题。
在我的日志中,我看到的行为如下:
invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed
为什么第二次失败后,从invoiceEvent重新消费。orderId = 46?
使选项 2(流事务)起作用的关键点是:
- 分配一个 Thread.UncaughtExceptionHandler() 以便在出现任何未捕获的异常时启动一个新的 StreamThread(默认情况下 StreamThread 终止 - 请参阅下面的代码片段)。如果 Kafka 代理的生产失败,甚至可能会发生这种情况,它不必与流中的业务逻辑代码相关。
- 考虑设置一个策略来处理消息的反序列化(当您使用时)。检查 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc)。例如,你应该忽略并消费下一条消息,还是停止从相关的 Kafka 分区消费。
- 在 Streams 的情况下,即使您设置 MAX_POLL_RECORDS_CONFIG=1(每个 poll/batch 一条记录),仍然会消耗偏移量并且不会为每条消息提交生成的消息。这种情况导致问题中描述的情况(参见 "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?")。
- Kafka 事务还不能在 Windows 上运行。修复将在 Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052) 中提供。
考虑检查您如何处理序列化异常(或生产过程中的一般异常)(here and here)
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
}
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig)
{
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
streamsBuilderFactoryBean.stop();
log.debug("creating and starting a new StreamsThread ..");
streamsBuilderFactoryBean.start();
}
});
return streamsBuilderFactoryBean;
}
我的目标是从主题 A 中消费,进行一些处理并生成主题 B,作为单个原子操作。为此,我看到了两个选择:
- 使用 spring-kafka @Kafkalistener 和 KafkaTemplate here。
- 使用 Streams eos(恰好一次)功能。
我已成功验证选项 #1。成功是指如果我的处理失败(抛出 IllegalArgumentException),来自主题 A 的消费消息将继续被 KafkaListener 消费。这是我所期望的,因为未提交偏移量并且使用了 DefaultAfterRollbackProcessor。
如果我不使用 KafkaListener 而使用流来从主题 A 消费、处理并发送到主题 B(选项 #2),我希望看到相同的行为。但是即使在我处理 IllegalArgumentException 时抛出消息,流也只使用一次。这是预期的行为吗?
在 Streams 案例中,我唯一的配置如下:
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
}
}
//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
streamsBuilderFactoryBean.stop();
log.debug("creating and starting a new StreamsThread ..");
streamsBuilderFactoryBean.start();
}
});
return streamsBuilderFactoryBean;
}
我的直播是这样的:
@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;
@Autowired
private TaxService taxService;
@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {
KStream<String, InvoiceEvents> kStream = builder.stream("A",
Consumed.with(Serdes.String(), eventSerde));
kStream
.mapValues(v ->
{
// get tax from possibly remote service
// an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
int tax = taxService.getTaxForInvoice(v);
// create a TaxCalculated event
InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
return taxCalculatedEvent;
})
.to("B", Produced.with(Serdes.String(), eventSerde));
return kStream;
}
happy path streams 场景有效:如果处理时没有抛出异常,则消息会正确出现在主题 B 中。
我的单元测试:
@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
.thenThrow(new IllegalArgumentException("Tax calculation failed"));
InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");
// produce with key
IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);
// wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
Thread.sleep(30000);
// ...
}
更新: 在我的单元测试中,我发送了 50 个 invoiceEvents (orderId=1,...,50),我处理它们并将它们发送到目标主题。
在我的日志中,我看到的行为如下:
invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed
为什么第二次失败后,从invoiceEvent重新消费。orderId = 46?
使选项 2(流事务)起作用的关键点是:
- 分配一个 Thread.UncaughtExceptionHandler() 以便在出现任何未捕获的异常时启动一个新的 StreamThread(默认情况下 StreamThread 终止 - 请参阅下面的代码片段)。如果 Kafka 代理的生产失败,甚至可能会发生这种情况,它不必与流中的业务逻辑代码相关。
- 考虑设置一个策略来处理消息的反序列化(当您使用时)。检查 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc)。例如,你应该忽略并消费下一条消息,还是停止从相关的 Kafka 分区消费。
- 在 Streams 的情况下,即使您设置 MAX_POLL_RECORDS_CONFIG=1(每个 poll/batch 一条记录),仍然会消耗偏移量并且不会为每条消息提交生成的消息。这种情况导致问题中描述的情况(参见 "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?")。
- Kafka 事务还不能在 Windows 上运行。修复将在 Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052) 中提供。
考虑检查您如何处理序列化异常(或生产过程中的一般异常)(here and here)
@Configuration @EnableKafkaStreams public class KafkaStreamsConfiguration { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public StreamsConfig kStreamsConfigs() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082"); // this should be enough to enable transactions props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); return new StreamsConfig(props); } } @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) { StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig); streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e); streamsBuilderFactoryBean.stop(); log.debug("creating and starting a new StreamsThread .."); streamsBuilderFactoryBean.start(); } }); return streamsBuilderFactoryBean; }