使用 Kafka 的 Streams 处理不良消息 API
Handling bad messages using Kafka's Streams API
我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
我想知道处理 "bad messages" 的最佳方法。这可能是我无法正确反序列化的消息之类的事情,或者 processing/filtering 逻辑可能以某种意外的方式失败(我没有外部依赖性,所以应该没有那种瞬态错误)。
我正在考虑将我所有的 processing/filtering 代码包装在一个 try catch 中,如果出现异常则路由到 "error topic"。然后我可以研究消息并修改它或适当地修复我的代码,然后重放它以掌握它。如果我让任何异常传播,流似乎会被阻塞并且不会接收到更多消息。
- 这种方法是否被认为是最佳实践?
- 有没有方便的Kafka streams方法来处理这个问题?我认为没有 DLQ 的概念...
- 阻止 Kafka 在 "bad message" 上干扰的替代方法是什么?
- 有哪些替代错误处理方法?
为了完整起见,这是我的代码(伪代码):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
非常感谢任何帮助。
目前,Kafka Streams 仅提供有限的错误处理能力。目前正在进行简化此操作的工作。目前,您的整体方法似乎是一个不错的选择。
一条关于处理 de/serialization 错误的评论:手动处理这些错误,需要您执行 de/serialization "manually"。这意味着,您需要为您的 Streams 应用程序的 input/output 主题配置 ByteArraySerde
s 的键和值,并添加一个 map()
来执行 de/serialization(即 KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>
——如果您还想捕获序列化异常,则反之亦然)。否则,您不能 try-catch
反序列化异常。
使用您当前的方法,您 "only" 验证给定的字符串是否代表有效文档 -- 但也可能是消息本身已损坏且无法转换为 String
首先在源运算符中。因此,您实际上并没有用代码覆盖反序列化异常。但是,如果您确定永远不会发生反序列化异常,那么您的方法就足够了。
更新
此问题已通过 KIP-161 解决,并将包含在下一个版本 1.0.0 中。它允许您通过参数 default.deserialization.exception.handler
注册回调。每次在反序列化过程中发生异常时都会调用该处理程序,并允许您 return 一个 DeserializationResponse
(CONTINUE
-> 删除记录并继续,或者 FAIL
即默认值)。
更新 2
使用 KIP-210(将成为 Kafka 1.1 的一部分)也可以在生产者端处理错误,类似于消费者部分,通过配置 ProductionExceptionHandler
注册 [=20] =] 可以 return CONTINUE
.
2018 年 3 月 23 日更新: Kafka 1.0 通过 [=29= 为错误消息 ("poison pills") 提供了更好更容易的处理] 在 Kafka 1.0 文档中。
This could potentially be things like messages that I can't deserialize properly [...]
好的,我在这里的回答主要集中在(反)序列化问题上,因为这对于大多数用户来说可能是最棘手的情况。
[...] or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).
同样的思路(对于反序列化)也可以应用于处理逻辑中的失败。在这里,大多数人倾向于选择下面的选项 2(减去反序列化部分),但是 YMMV。
I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.
- Is this approach considered best practice?
是的,目前这是要走的路。本质上,两种最常见的模式是 (1) 跳过损坏的消息或 (2) 将损坏的记录发送到隔离主题,即死信队列。
- Is there a convenient Kafka streams way to handle this? I don't think there is a concept of a DLQ...
是的,有一种方法可以处理这个问题,包括使用死信队列。但是,它(至少恕我直言)还不是那么方便。如果您对 API 应该如何处理这个问题有任何反馈——例如通过新的或更新的方法,配置设置("if serialization/deserialization fails send the problematic record to THIS quarantine topic")——请告诉我们。 :-)
- What are the alternative ways to stop Kafka jamming on a "bad message"?
- What alternative error handling approaches are there?
请参阅下面的示例。
FWIW,Kafka 社区也在讨论添加一个新的 CLI 工具,它允许您跳过损坏的消息。但是,作为 Kafka Streams 的用户 API,我认为理想情况下,您希望直接在代码中处理此类场景,并且仅在万不得已时才回退到 CLI 实用程序。
以下是 Kafka Streams DSL 处理损坏的 records/messages 又名 "poison pills" 的一些模式。这取自 http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
选项 1:使用 flatMap
跳过损坏的记录
这可以说是大多数用户想要做的事情。
- 我们使用
flatMap
是因为它允许您在每个输入记录中输出零个、一个或多个输出记录。在记录损坏的情况下,我们不输出任何内容(零记录),因此 ignoring/skipping 损坏的记录。
- 与此处列出的其他方法相比,此方法的好处:我们只需手动反序列化一条记录一次!
- 这种方法的缺点:
flatMap
"marks" 潜在数据重新分区的输入流,即如果您执行基于键的操作,例如分组 (groupBy
/groupByKey
) 或之后加入,您的数据将在幕后重新分区。由于这可能是一个代价高昂的步骤,我们不希望这种情况不必要地发生。如果您知道记录密钥始终有效或者您不需要对密钥进行操作(因此将它们保持为 byte[]
格式的 "raw" 密钥),您可以从 flatMap
到 flatMapValues
,即使您稍后 join/group/aggregate 流也不会导致数据重新分区。
代码示例:
Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return Collections.emptyList();
}
);
选项 2:具有 branch
的死信队列
与选项 1(忽略损坏的记录)相比,选项 2 通过从 "main" 输入流中过滤掉损坏的消息并将它们写入隔离主题(想想:死信队列)来保留损坏的消息。缺点是,对于有效的记录,我们必须支付两次手动反序列化成本。
KStream<byte[], byte[]> input = ...;
KStream<byte[], byte[]>[] partitioned = input.branch(
(k, v) -> {
boolean isValidRecord = false;
try {
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException ignored) {}
return isValidRecord;
},
(k, v) -> true
);
// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
选项 3:使用 filter
跳过损坏的记录
我只是为了完整性才提到这一点。这个选项看起来像是选项 1 和选项 2 的混合体,但比其中任何一个都差。与选项 1 相比,您必须为有效记录支付两次手动反序列化成本(糟糕!)。与选项 2 相比,您无法在死信队列中保留损坏的记录。
KStream<byte[], byte[]> validRecordsOnly = input.filter(
(k, v) -> {
boolean isValidRecord = false;
try {
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return isValidRecord;
}
);
KStream<String, Long> doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Any help greatly appreciated.
希望能帮到你。如果是,我将感谢您对我们如何改进 Kafka Streams API 以比现在更方便的方式 better/more 处理 failures/exceptions 的反馈。 :-)
我认为这些示例在使用 Avro 时根本不起作用。
当模式无法解析时(例如,有 bad/non-avro 消息破坏主题)首先没有 key
或 value
反序列化因为在调用 DSL .branch()
代码时,已经抛出(或处理)了异常。
任何人都可以确认我是否确实如此?使用 Avro 时,您在这里提到的非常流畅的方法是不可能的?
KIP-161 确实解释了如何使用处理程序,但是,将其视为拓扑的一部分会更加流畅。
对于处理逻辑,您可以采用这种方法:
someKStream
.mapValues(inputValue -> {
// for each execution the below "return" could provide a different class than the previous run!
// e.g. "return isFailedProcessing ? failValue : successValue;"
// where failValue and successValue have no related classes
return someObject; // someObject class vary at runtime depending on your business
}) // here you'll have KStream<whateverKeyClass, Object> -> yes, Object for the value!
// you could have a different logic for choosing
// the target topic, below is just an example
.to((k, v, recordContext) -> v instanceof failValueClass ?
"dead-letter-topic" : "success-topic",
// you could completelly ignore the "Produced" part
// and rely on spring-boot properties only, e.g.
// spring.kafka.streams.properties.default.key.serde=yourKeySerde
// spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
Produced.with(yourKeySerde,
// JsonSerde could be an instance configured as you need
// (with type mappings or headers setting disabled, etc)
new JsonSerde<>()));
您的 class 虽然不同并且涉及不同的主题,但将按预期进行序列化。
当不使用to()
,而是想继续其他处理时,他可以使用branch()
,根据kafka值class拆分逻辑; branch()
的技巧是 return KStream<keyClass, ?>[]
以进一步允许将单个数组项转换为适当的 class。
如果你想发送异常(自定义异常)到另一个主题(ERROR_TOPIC_NAME):
@Bean
public KStream<String, ?> kafkaStreamInput(StreamsBuilder kStreamBuilder) {
KStream<String, InputModel> input = kStreamBuilder.stream(INPUT_TOPIC_NAME);
return service.messageHandler(input);
}
public KStream<String, ?> messageHandler(KStream<String, InputModel> inputTopic) {
KStream<String, Object> output;
output = inputTopic.mapValues(v -> {
try {
//return InputModel
return normalMethod(v);
} catch (Exception e) {
//return ErrorModel
return errorHandler(e);
}
});
output.filter((k, v) -> (v instanceof ErrorModel)).to(KafkaStreamsConfig.ERROR_TOPIC_NAME);
output.filter((k, v) -> (v instanceof InputModel)).to(KafkaStreamsConfig.OUTPUT_TOPIC_NAME);
return output;
}
如果要处理Kafka异常,跳过:
@Autowired
public ConsumerErrorHandler(
KafkaProducer<String, ErrorModel> dlqProducer) {
this.dlqProducer = dlqProducer;
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory.getIfAvailable());
factory.setErrorHandler(((exception, data) -> {
ErrorModel errorModel = ErrorModel.builder().message()
.status("500").build();
assert data != null;
dlqProducer.send(new ProducerRecord<>(DLQ_TOPIC, data.key().toString(), errorModel));
}));
return factory;
}
以上所有答案虽然有效且有用,但它们都假设您的流拓扑是无状态的。例如回到原来的例子,
master topic -> my processing in a mapper/filter -> output topics
“我在 mapper/filter 中的处理应该是无状态的。 IE。不是 re-partitioning(也就是写一个持久的 re-partition 主题)或做一个 toTable()
(也就是写一个更新日志主题)。如果处理在拓扑结构中进一步失败并且您提交了事务(通过遵循上面提到的 3 个选项中的任何一个 - 平面图,分支或过滤器 - 那么您必须手动或以编程方式最终删除不一致的状态。这意味着编写额外的自动执行此操作的自定义代码。
我个人希望 Streams 也为任何未处理的运行时异常提供 LogAndSkip
选项,而不仅仅是反序列化和生产异常。
有人对此有什么想法吗?
我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
我想知道处理 "bad messages" 的最佳方法。这可能是我无法正确反序列化的消息之类的事情,或者 processing/filtering 逻辑可能以某种意外的方式失败(我没有外部依赖性,所以应该没有那种瞬态错误)。
我正在考虑将我所有的 processing/filtering 代码包装在一个 try catch 中,如果出现异常则路由到 "error topic"。然后我可以研究消息并修改它或适当地修复我的代码,然后重放它以掌握它。如果我让任何异常传播,流似乎会被阻塞并且不会接收到更多消息。
- 这种方法是否被认为是最佳实践?
- 有没有方便的Kafka streams方法来处理这个问题?我认为没有 DLQ 的概念...
- 阻止 Kafka 在 "bad message" 上干扰的替代方法是什么?
- 有哪些替代错误处理方法?
为了完整起见,这是我的代码(伪代码):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
非常感谢任何帮助。
目前,Kafka Streams 仅提供有限的错误处理能力。目前正在进行简化此操作的工作。目前,您的整体方法似乎是一个不错的选择。
一条关于处理 de/serialization 错误的评论:手动处理这些错误,需要您执行 de/serialization "manually"。这意味着,您需要为您的 Streams 应用程序的 input/output 主题配置 ByteArraySerde
s 的键和值,并添加一个 map()
来执行 de/serialization(即 KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>
——如果您还想捕获序列化异常,则反之亦然)。否则,您不能 try-catch
反序列化异常。
使用您当前的方法,您 "only" 验证给定的字符串是否代表有效文档 -- 但也可能是消息本身已损坏且无法转换为 String
首先在源运算符中。因此,您实际上并没有用代码覆盖反序列化异常。但是,如果您确定永远不会发生反序列化异常,那么您的方法就足够了。
更新
此问题已通过 KIP-161 解决,并将包含在下一个版本 1.0.0 中。它允许您通过参数 default.deserialization.exception.handler
注册回调。每次在反序列化过程中发生异常时都会调用该处理程序,并允许您 return 一个 DeserializationResponse
(CONTINUE
-> 删除记录并继续,或者 FAIL
即默认值)。
更新 2
使用 KIP-210(将成为 Kafka 1.1 的一部分)也可以在生产者端处理错误,类似于消费者部分,通过配置 ProductionExceptionHandler
注册 [=20] =] 可以 return CONTINUE
.
2018 年 3 月 23 日更新: Kafka 1.0 通过 [=29= 为错误消息 ("poison pills") 提供了更好更容易的处理] 在 Kafka 1.0 文档中。
This could potentially be things like messages that I can't deserialize properly [...]
好的,我在这里的回答主要集中在(反)序列化问题上,因为这对于大多数用户来说可能是最棘手的情况。
[...] or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).
同样的思路(对于反序列化)也可以应用于处理逻辑中的失败。在这里,大多数人倾向于选择下面的选项 2(减去反序列化部分),但是 YMMV。
I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.
- Is this approach considered best practice?
是的,目前这是要走的路。本质上,两种最常见的模式是 (1) 跳过损坏的消息或 (2) 将损坏的记录发送到隔离主题,即死信队列。
- Is there a convenient Kafka streams way to handle this? I don't think there is a concept of a DLQ...
是的,有一种方法可以处理这个问题,包括使用死信队列。但是,它(至少恕我直言)还不是那么方便。如果您对 API 应该如何处理这个问题有任何反馈——例如通过新的或更新的方法,配置设置("if serialization/deserialization fails send the problematic record to THIS quarantine topic")——请告诉我们。 :-)
- What are the alternative ways to stop Kafka jamming on a "bad message"?
- What alternative error handling approaches are there?
请参阅下面的示例。
FWIW,Kafka 社区也在讨论添加一个新的 CLI 工具,它允许您跳过损坏的消息。但是,作为 Kafka Streams 的用户 API,我认为理想情况下,您希望直接在代码中处理此类场景,并且仅在万不得已时才回退到 CLI 实用程序。
以下是 Kafka Streams DSL 处理损坏的 records/messages 又名 "poison pills" 的一些模式。这取自 http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
选项 1:使用 flatMap
这可以说是大多数用户想要做的事情。
- 我们使用
flatMap
是因为它允许您在每个输入记录中输出零个、一个或多个输出记录。在记录损坏的情况下,我们不输出任何内容(零记录),因此 ignoring/skipping 损坏的记录。 - 与此处列出的其他方法相比,此方法的好处:我们只需手动反序列化一条记录一次!
- 这种方法的缺点:
flatMap
"marks" 潜在数据重新分区的输入流,即如果您执行基于键的操作,例如分组 (groupBy
/groupByKey
) 或之后加入,您的数据将在幕后重新分区。由于这可能是一个代价高昂的步骤,我们不希望这种情况不必要地发生。如果您知道记录密钥始终有效或者您不需要对密钥进行操作(因此将它们保持为byte[]
格式的 "raw" 密钥),您可以从flatMap
到flatMapValues
,即使您稍后 join/group/aggregate 流也不会导致数据重新分区。
代码示例:
Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return Collections.emptyList();
}
);
选项 2:具有 branch
与选项 1(忽略损坏的记录)相比,选项 2 通过从 "main" 输入流中过滤掉损坏的消息并将它们写入隔离主题(想想:死信队列)来保留损坏的消息。缺点是,对于有效的记录,我们必须支付两次手动反序列化成本。
KStream<byte[], byte[]> input = ...;
KStream<byte[], byte[]>[] partitioned = input.branch(
(k, v) -> {
boolean isValidRecord = false;
try {
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException ignored) {}
return isValidRecord;
},
(k, v) -> true
);
// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
选项 3:使用 filter
我只是为了完整性才提到这一点。这个选项看起来像是选项 1 和选项 2 的混合体,但比其中任何一个都差。与选项 1 相比,您必须为有效记录支付两次手动反序列化成本(糟糕!)。与选项 2 相比,您无法在死信队列中保留损坏的记录。
KStream<byte[], byte[]> validRecordsOnly = input.filter(
(k, v) -> {
boolean isValidRecord = false;
try {
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return isValidRecord;
}
);
KStream<String, Long> doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Any help greatly appreciated.
希望能帮到你。如果是,我将感谢您对我们如何改进 Kafka Streams API 以比现在更方便的方式 better/more 处理 failures/exceptions 的反馈。 :-)
我认为这些示例在使用 Avro 时根本不起作用。
当模式无法解析时(例如,有 bad/non-avro 消息破坏主题)首先没有 key
或 value
反序列化因为在调用 DSL .branch()
代码时,已经抛出(或处理)了异常。
任何人都可以确认我是否确实如此?使用 Avro 时,您在这里提到的非常流畅的方法是不可能的?
KIP-161 确实解释了如何使用处理程序,但是,将其视为拓扑的一部分会更加流畅。
对于处理逻辑,您可以采用这种方法:
someKStream
.mapValues(inputValue -> {
// for each execution the below "return" could provide a different class than the previous run!
// e.g. "return isFailedProcessing ? failValue : successValue;"
// where failValue and successValue have no related classes
return someObject; // someObject class vary at runtime depending on your business
}) // here you'll have KStream<whateverKeyClass, Object> -> yes, Object for the value!
// you could have a different logic for choosing
// the target topic, below is just an example
.to((k, v, recordContext) -> v instanceof failValueClass ?
"dead-letter-topic" : "success-topic",
// you could completelly ignore the "Produced" part
// and rely on spring-boot properties only, e.g.
// spring.kafka.streams.properties.default.key.serde=yourKeySerde
// spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
Produced.with(yourKeySerde,
// JsonSerde could be an instance configured as you need
// (with type mappings or headers setting disabled, etc)
new JsonSerde<>()));
您的 class 虽然不同并且涉及不同的主题,但将按预期进行序列化。
当不使用to()
,而是想继续其他处理时,他可以使用branch()
,根据kafka值class拆分逻辑; branch()
的技巧是 return KStream<keyClass, ?>[]
以进一步允许将单个数组项转换为适当的 class。
如果你想发送异常(自定义异常)到另一个主题(ERROR_TOPIC_NAME):
@Bean
public KStream<String, ?> kafkaStreamInput(StreamsBuilder kStreamBuilder) {
KStream<String, InputModel> input = kStreamBuilder.stream(INPUT_TOPIC_NAME);
return service.messageHandler(input);
}
public KStream<String, ?> messageHandler(KStream<String, InputModel> inputTopic) {
KStream<String, Object> output;
output = inputTopic.mapValues(v -> {
try {
//return InputModel
return normalMethod(v);
} catch (Exception e) {
//return ErrorModel
return errorHandler(e);
}
});
output.filter((k, v) -> (v instanceof ErrorModel)).to(KafkaStreamsConfig.ERROR_TOPIC_NAME);
output.filter((k, v) -> (v instanceof InputModel)).to(KafkaStreamsConfig.OUTPUT_TOPIC_NAME);
return output;
}
如果要处理Kafka异常,跳过:
@Autowired
public ConsumerErrorHandler(
KafkaProducer<String, ErrorModel> dlqProducer) {
this.dlqProducer = dlqProducer;
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory.getIfAvailable());
factory.setErrorHandler(((exception, data) -> {
ErrorModel errorModel = ErrorModel.builder().message()
.status("500").build();
assert data != null;
dlqProducer.send(new ProducerRecord<>(DLQ_TOPIC, data.key().toString(), errorModel));
}));
return factory;
}
以上所有答案虽然有效且有用,但它们都假设您的流拓扑是无状态的。例如回到原来的例子,
master topic -> my processing in a mapper/filter -> output topics
“我在 mapper/filter 中的处理应该是无状态的。 IE。不是 re-partitioning(也就是写一个持久的 re-partition 主题)或做一个 toTable()
(也就是写一个更新日志主题)。如果处理在拓扑结构中进一步失败并且您提交了事务(通过遵循上面提到的 3 个选项中的任何一个 - 平面图,分支或过滤器 - 那么您必须手动或以编程方式最终删除不一致的状态。这意味着编写额外的自动执行此操作的自定义代码。
我个人希望 Streams 也为任何未处理的运行时异常提供 LogAndSkip
选项,而不仅仅是反序列化和生产异常。
有人对此有什么想法吗?