使用 Kafka Streams Binder 和函数式处理器处理 Spring Cloud Streams 中处理异常的示例
Example on handling processing exception in Spring Cloud Streams with Kafka Streams Binder and the functional style processor
我正在使用 Spring Cloud Streams 和 Kafka Streams Binder,功能
样式 处理器API 以及 多处理器。
以这种方式配置具有多个处理器和多个 Kafka 主题的处理应用程序并留在 Spring Boot universe with 中真的很酷/actuator、WebClient 等。实际上我比使用普通的 Apache Kafka Streams 更喜欢它。
但是: 我想为处理器中发生的异常集成异常处理,并将这些无法处理的消息发送到 DLQ。我已经为反序列化错误设置了 DLQ,但除了 sobychacko's answer on a similar question. But this is only a snippet! Does anybody have a more detailed example? I am asking this because the Spring Cloud Stream documentation on branching looks quite different.
之外,我没有找到任何好的建议来实现这一点
很高兴听到您使用 Spring Cloud Stream 和 Kafka Streams。
您提到的参考文档来自旧版本。请从此页面导航到较新的文档:https://spring.io/projects/spring-cloud-stream#learn
这个问题之前有人提出过。查看这些是否对您的用例有帮助:
我将 sobychako 链接中的代码集成到我的代码中。由于 StreamBridge,我只需要添加 org.springframework.cloud:spring-cloud-stream-binder-kafka
作为依赖项。到目前为止,我只使用了 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
活页夹。
更新:
我现在已经从链接的 .branch()
解决方案切换到现在更简单的 .filter()
代码 - 见下文。
我的两个处理器处理 3 个主题(有 2 个运行时错误主题和 2 个反序列化错误主题)的示例代码如下。 DocumentedErrorOutput
class 组合了密钥、消息和堆栈跟踪并写入错误主题。
spring:
application:
name: scs-sample
kafka:
bootstrap-servers: 'localhost:9092'
jaas:
enabled: false
security:
protocol: PLAINTEXT
properties:
sasl:
mechanism: PLAIN
cloud:
# which steps are performed?
function:
definition: 'process1;process2'
stream:
instance-count: 1 # for local development 1 (is Default)
bindings:
# PROCESSOR process1 - - - - - - - - - - - - - - - - - - - -
process1-in-0:
destination: ${application.topic.topic1}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process1-out-0:
destination: ${application.topic.topic2}
process1-out-error:
destination: ${application.topic.topic1Error}
# PROCESSOR process2 - - - - - - - - - - - - - - - - - - - -
process2-in-0:
destination: ${application.topic.topic2}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process2-out-0:
destination: ${application.topic.topic3}
process2-out-error:
destination: ${application.topic.topic2Error}
kafka:
streams:
binder:
auto-create-topics: false # We do not want topic auto creation (is disabled in our cluster)
functions:
# if we use multiple processors, the application-id MUST BE SET (and unique in cluster)!
process1:
application-id: ${application.id.process1}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
process2:
application-id: ${application.id.process2}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
# we use DLQs if the JSON cannot be deserialized
deserialization-exception-handler: sendtodlq
required-acks: -1 # all in-sync-replicas
replication-factor: 1 # our replication factor in local dev mode
configuration: # Standard Kafka Streams configuration properties (consumer and producer) on binder level
commit.interval.ms: 1000 # the definition here at the binder level works
default:
# Define producer exception handler (can be done here globally, but easier in Java)
# production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
# producer-properties: # Standard Kafka Streams configuration properties (producer only) on binder level
# consumer-properties: # Standard Kafka Streams configuration properties (consumer only) on binder level
bindings:
process1-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic1Dlq}
process1-out-0:
producer: # producer properties on each function's output level
streamPartitionerBeanName: streamPartitionerTopic2
process2-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic2Dlq}
process2-out-0:
producer: # producer properties on each function's output) level
streamPartitionerBeanName: streamPartitionerTopic3
management:
endpoints:
web:
exposure:
include: ['health', 'info', 'bindings', 'logfile', 'metrics', 'configprops', 'env', 'kafkastreamstopology']
endpoint:
health:
show-details: ALWAYS
health:
binders:
enabled: true # is default
application:
topic:
topic1: topic1
topic1Error: topic1.err
topic1Dlq: topic1.dlq
topic2: topic2
topic2Error: topic2.err
topic2Dlq: topic2.dlq
topic3: topic3
id:
# the id is used as the prefix for changelog (KTable) topics, so it has to be configurable
process1: process1
process2: process2
package de.datev.pws.loon.dcp.processor;
import de.datev.pws.loon.dcp.exceptions.DocumentedErrorOutput;
import de.datev.pws.loon.dcp.model.Message1;
import de.datev.pws.loon.dcp.model.Message2;
import de.datev.pws.loon.dcp.model.Message3;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@Component
public class EventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
private final StreamBridge streamBridge;
private final Processor1 processor1;
private final Processor2 processor2;
public EventProcessor(StreamBridge streamBridge, Processor1 processor1, Processor2 processor2) {
this.streamBridge = streamBridge;
this.processor1 = processor1;
this.processor2 = processor2;
}
@Bean
public Function<KStream<String, Message1>, KStream<String, Message2>> process1() {
final AtomicReference<KeyValue<String, Message2>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor1.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process1", messageKey, messageValue, exception);
return false; // // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
@Bean
public Function<KStream<String, Message2>, KStream<String, Message3>> process2() {
final AtomicReference<KeyValue<String, Message3>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor2.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process2", messageKey, messageValue, exception);
return false; // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
protected void handleProcessingException(String processName, String messageKey, Object messageValue, Exception exception) {
final DocumentedErrorOutput documentedErrorOutput = new DocumentedErrorOutput(messageKey, messageValue, exception);
final Message<DocumentedErrorOutput> documentedErrorOutputMessage = MessageBuilder.withPayload(documentedErrorOutput).build();
final String bindingName = processName + "-out-error";
LOGGER.error(">>> EXCEPTION in process {} for message={}! Sending problem to out binding \"{}\".", processName, messageValue, bindingName, exception);
streamBridge.send(bindingName, documentedErrorOutputMessage);
}
// Some samples for partitioning
@Bean
public StreamPartitioner<String, Message2> streamPartitionerTopic2() {
LOGGER.info("Performing StreamPartitioner setup for topic2/Message2 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic2 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
@Bean
public StreamPartitioner<String, Message3> streamPartitionerTopic3() {
LOGGER.info("Performing StreamPartitioner setup for topic3/Message3 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic3 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
}
更新:
处理器的示例代码,乘以一个值
@Component
public class Processor2 {
public KeyValue<String, Message3> streamProcess(String key, Message2 message2) {
Message3 message3 = Message3.builder()
.requestId(message2.getRequestId())
.startTime(message2.getStartTime())
.calculatedValue2(message2.getCalculatedValue1() * 2)
.build();
return new KeyValue<>(key, message3);
}
}
我正在使用 Spring Cloud Streams 和 Kafka Streams Binder,功能 样式 处理器API 以及 多处理器。
以这种方式配置具有多个处理器和多个 Kafka 主题的处理应用程序并留在 Spring Boot universe with 中真的很酷/actuator、WebClient 等。实际上我比使用普通的 Apache Kafka Streams 更喜欢它。
但是: 我想为处理器中发生的异常集成异常处理,并将这些无法处理的消息发送到 DLQ。我已经为反序列化错误设置了 DLQ,但除了 sobychacko's answer on a similar question. But this is only a snippet! Does anybody have a more detailed example? I am asking this because the Spring Cloud Stream documentation on branching looks quite different.
之外,我没有找到任何好的建议来实现这一点很高兴听到您使用 Spring Cloud Stream 和 Kafka Streams。
您提到的参考文档来自旧版本。请从此页面导航到较新的文档:https://spring.io/projects/spring-cloud-stream#learn
这个问题之前有人提出过。查看这些是否对您的用例有帮助:
我将 sobychako 链接中的代码集成到我的代码中。由于 StreamBridge,我只需要添加 org.springframework.cloud:spring-cloud-stream-binder-kafka
作为依赖项。到目前为止,我只使用了 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
活页夹。
更新:
我现在已经从链接的 .branch()
解决方案切换到现在更简单的 .filter()
代码 - 见下文。
我的两个处理器处理 3 个主题(有 2 个运行时错误主题和 2 个反序列化错误主题)的示例代码如下。 DocumentedErrorOutput
class 组合了密钥、消息和堆栈跟踪并写入错误主题。
spring:
application:
name: scs-sample
kafka:
bootstrap-servers: 'localhost:9092'
jaas:
enabled: false
security:
protocol: PLAINTEXT
properties:
sasl:
mechanism: PLAIN
cloud:
# which steps are performed?
function:
definition: 'process1;process2'
stream:
instance-count: 1 # for local development 1 (is Default)
bindings:
# PROCESSOR process1 - - - - - - - - - - - - - - - - - - - -
process1-in-0:
destination: ${application.topic.topic1}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process1-out-0:
destination: ${application.topic.topic2}
process1-out-error:
destination: ${application.topic.topic1Error}
# PROCESSOR process2 - - - - - - - - - - - - - - - - - - - -
process2-in-0:
destination: ${application.topic.topic2}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process2-out-0:
destination: ${application.topic.topic3}
process2-out-error:
destination: ${application.topic.topic2Error}
kafka:
streams:
binder:
auto-create-topics: false # We do not want topic auto creation (is disabled in our cluster)
functions:
# if we use multiple processors, the application-id MUST BE SET (and unique in cluster)!
process1:
application-id: ${application.id.process1}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
process2:
application-id: ${application.id.process2}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
# we use DLQs if the JSON cannot be deserialized
deserialization-exception-handler: sendtodlq
required-acks: -1 # all in-sync-replicas
replication-factor: 1 # our replication factor in local dev mode
configuration: # Standard Kafka Streams configuration properties (consumer and producer) on binder level
commit.interval.ms: 1000 # the definition here at the binder level works
default:
# Define producer exception handler (can be done here globally, but easier in Java)
# production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
# producer-properties: # Standard Kafka Streams configuration properties (producer only) on binder level
# consumer-properties: # Standard Kafka Streams configuration properties (consumer only) on binder level
bindings:
process1-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic1Dlq}
process1-out-0:
producer: # producer properties on each function's output level
streamPartitionerBeanName: streamPartitionerTopic2
process2-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic2Dlq}
process2-out-0:
producer: # producer properties on each function's output) level
streamPartitionerBeanName: streamPartitionerTopic3
management:
endpoints:
web:
exposure:
include: ['health', 'info', 'bindings', 'logfile', 'metrics', 'configprops', 'env', 'kafkastreamstopology']
endpoint:
health:
show-details: ALWAYS
health:
binders:
enabled: true # is default
application:
topic:
topic1: topic1
topic1Error: topic1.err
topic1Dlq: topic1.dlq
topic2: topic2
topic2Error: topic2.err
topic2Dlq: topic2.dlq
topic3: topic3
id:
# the id is used as the prefix for changelog (KTable) topics, so it has to be configurable
process1: process1
process2: process2
package de.datev.pws.loon.dcp.processor;
import de.datev.pws.loon.dcp.exceptions.DocumentedErrorOutput;
import de.datev.pws.loon.dcp.model.Message1;
import de.datev.pws.loon.dcp.model.Message2;
import de.datev.pws.loon.dcp.model.Message3;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@Component
public class EventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
private final StreamBridge streamBridge;
private final Processor1 processor1;
private final Processor2 processor2;
public EventProcessor(StreamBridge streamBridge, Processor1 processor1, Processor2 processor2) {
this.streamBridge = streamBridge;
this.processor1 = processor1;
this.processor2 = processor2;
}
@Bean
public Function<KStream<String, Message1>, KStream<String, Message2>> process1() {
final AtomicReference<KeyValue<String, Message2>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor1.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process1", messageKey, messageValue, exception);
return false; // // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
@Bean
public Function<KStream<String, Message2>, KStream<String, Message3>> process2() {
final AtomicReference<KeyValue<String, Message3>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor2.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process2", messageKey, messageValue, exception);
return false; // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
protected void handleProcessingException(String processName, String messageKey, Object messageValue, Exception exception) {
final DocumentedErrorOutput documentedErrorOutput = new DocumentedErrorOutput(messageKey, messageValue, exception);
final Message<DocumentedErrorOutput> documentedErrorOutputMessage = MessageBuilder.withPayload(documentedErrorOutput).build();
final String bindingName = processName + "-out-error";
LOGGER.error(">>> EXCEPTION in process {} for message={}! Sending problem to out binding \"{}\".", processName, messageValue, bindingName, exception);
streamBridge.send(bindingName, documentedErrorOutputMessage);
}
// Some samples for partitioning
@Bean
public StreamPartitioner<String, Message2> streamPartitionerTopic2() {
LOGGER.info("Performing StreamPartitioner setup for topic2/Message2 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic2 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
@Bean
public StreamPartitioner<String, Message3> streamPartitionerTopic3() {
LOGGER.info("Performing StreamPartitioner setup for topic3/Message3 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic3 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
}
更新:
处理器的示例代码,乘以一个值
@Component
public class Processor2 {
public KeyValue<String, Message3> streamProcess(String key, Message2 message2) {
Message3 message3 = Message3.builder()
.requestId(message2.getRequestId())
.startTime(message2.getStartTime())
.calculatedValue2(message2.getCalculatedValue1() * 2)
.build();
return new KeyValue<>(key, message3);
}
}