使用 Kafka Streams Binder 和函数式处理器处理 Spring Cloud Streams 中处理异常的示例

Example on handling processing exception in Spring Cloud Streams with Kafka Streams Binder and the functional style processor

我正在使用 Spring Cloud StreamsKafka Streams Binder功能 样式 处理器API 以及 多处理器

以这种方式配置具有多个处理器和多个 Kafka 主题的处理应用程序并留在 Spring Boot universe with 中真的很酷/actuatorWebClient 等。实际上我比使用普通的 Apache Kafka Streams 更喜欢它。

但是: 我想为处理器中发生的异常集成异常处理,并将这些无法处理的消息发送到 DLQ。我已经为反序列化错误设置了 DLQ,但除了 sobychacko's answer on a similar question. But this is only a snippet! Does anybody have a more detailed example? I am asking this because the Spring Cloud Stream documentation on branching looks quite different.

之外,我没有找到任何好的建议来实现这一点

很高兴听到您使用 Spring Cloud Stream 和 Kafka Streams。

您提到的参考文档来自旧版本。请从此页面导航到较新的文档:https://spring.io/projects/spring-cloud-stream#learn

这个问题之前有人提出过。查看这些是否对您的用例有帮助:

我将 sobychako 链接中的代码集成到我的代码中。由于 StreamBridge,我只需要添加 org.springframework.cloud:spring-cloud-stream-binder-kafka 作为依赖项。到目前为止,我只使用了 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams 活页夹。

更新: 我现在已经从链接的 .branch() 解决方案切换到现在更简单的 .filter() 代码 - 见下文。

我的两个处理器处理 3 个主题(有 2 个运行时错误主题和 2 个反序列化错误主题)的示例代码如下。 DocumentedErrorOutput class 组合了密钥、消息和堆栈跟踪并写入错误主题。

spring:
  application:
    name: scs-sample
  kafka:
    bootstrap-servers: 'localhost:9092'
    jaas:
      enabled: false
    security:
      protocol: PLAINTEXT
    properties:
      sasl:
        mechanism: PLAIN
  cloud:
    # which steps are performed?
    function:
      definition: 'process1;process2'
    stream:
      instance-count: 1 # for local development 1 (is Default)
      bindings:
        # PROCESSOR process1 - - - - - - - - - - - - - - - - - - - -
        process1-in-0:
          destination: ${application.topic.topic1}
          consumer: # consumer properties on each function (processor) level
            concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
            ackEachRecord: true # commit the offset after each single record is processed
            standardHeaders: timestamp
        process1-out-0:
          destination: ${application.topic.topic2}
        process1-out-error:
          destination: ${application.topic.topic1Error}
        # PROCESSOR process2 - - - - - - - - - - - - - - - - - - - -
        process2-in-0:
          destination: ${application.topic.topic2}
          consumer: # consumer properties on each function (processor) level
            concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
            ackEachRecord: true # commit the offset after each single record is processed
            standardHeaders: timestamp
        process2-out-0:
          destination: ${application.topic.topic3}
        process2-out-error:
          destination: ${application.topic.topic2Error}
      kafka:
        streams:
          binder:
            auto-create-topics: false # We do not want topic auto creation (is disabled in our cluster)
            functions:
              # if we use multiple processors, the application-id MUST BE SET (and unique in cluster)!
              process1:
                application-id: ${application.id.process1}
                # configuration:
                  # Define producer exception handler (can be done here on process level, but easier in Java)
                  # default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
              process2:
                application-id: ${application.id.process2}
                # configuration:
                  # Define producer exception handler (can be done here on process level, but easier in Java)
                  # default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
            # we use DLQs if the JSON cannot be deserialized
            deserialization-exception-handler: sendtodlq
            required-acks: -1  # all in-sync-replicas
            replication-factor: 1 # our replication factor in local dev mode
            configuration: # Standard Kafka Streams configuration properties (consumer and producer) on binder level
              commit.interval.ms: 1000 # the definition here at the binder level works
              default:
                # Define producer exception handler (can be done here globally, but easier in Java)
                # production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            # producer-properties: # Standard Kafka Streams configuration properties (producer only) on binder level
            # consumer-properties: # Standard Kafka Streams configuration properties (consumer only) on binder level
          bindings:
            process1-in-0:
              consumer: # consumer properties on each function's input level
                # Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
                dlqName: ${application.topic.topic1Dlq}
            process1-out-0:
              producer: # producer properties on each function's output level
                streamPartitionerBeanName: streamPartitionerTopic2
            process2-in-0:
              consumer: # consumer properties on each function's input level
                # Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
                dlqName: ${application.topic.topic2Dlq}
            process2-out-0:
              producer: # producer properties on each function's output) level
                streamPartitionerBeanName: streamPartitionerTopic3

management:
  endpoints:
    web:
      exposure:
        include: ['health', 'info', 'bindings', 'logfile', 'metrics', 'configprops', 'env', 'kafkastreamstopology']
  endpoint:
    health:
      show-details: ALWAYS
  health:
    binders:
      enabled: true # is default

application:
  topic:
    topic1: topic1
    topic1Error: topic1.err
    topic1Dlq: topic1.dlq
    topic2: topic2
    topic2Error: topic2.err
    topic2Dlq: topic2.dlq
    topic3: topic3
  id:
    # the id is used as the prefix for changelog (KTable) topics, so it has to be configurable
    process1: process1
    process2: process2
package de.datev.pws.loon.dcp.processor;

import de.datev.pws.loon.dcp.exceptions.DocumentedErrorOutput;
import de.datev.pws.loon.dcp.model.Message1;
import de.datev.pws.loon.dcp.model.Message2;
import de.datev.pws.loon.dcp.model.Message3;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

@Component
public class EventProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);

    private final StreamBridge streamBridge;
    private final Processor1 processor1;
    private final Processor2 processor2;

    public EventProcessor(StreamBridge streamBridge, Processor1 processor1, Processor2 processor2) {

        this.streamBridge = streamBridge;
        this.processor1 = processor1;
        this.processor2 = processor2;
    }

    @Bean
    public Function<KStream<String, Message1>, KStream<String, Message2>> process1() {

        final AtomicReference<KeyValue<String, Message2>> result = new AtomicReference<>(null);
        return input -> input
            .filter(
                (messageKey, messageValue) -> {
                    try { // Call the processor within try/catch
                        result.set(processor1.streamProcess(messageKey, messageValue));
                        return true; // We are done and signal with true, that the flow has an output
                    } catch (Exception exception) {
                        handleProcessingException("process1", messageKey, messageValue, exception);
                        return false; // // We have send the error and signal with false, that there is no output
                    }
                })
            .map((messageKey, messageValue) -> result.get());
    }

    @Bean
    public Function<KStream<String, Message2>, KStream<String, Message3>> process2() {

        final AtomicReference<KeyValue<String, Message3>> result = new AtomicReference<>(null);
        return input -> input
            .filter(
                (messageKey, messageValue) -> {
                    try { // Call the processor within try/catch
                        result.set(processor2.streamProcess(messageKey, messageValue));
                        return true;  // We are done and signal with true, that the flow has an output
                    } catch (Exception exception) {
                        handleProcessingException("process2", messageKey, messageValue, exception);
                        return false; // We have send the error and signal with false, that there is no output
                    }
                })
            .map((messageKey, messageValue) -> result.get());
    }

    protected void handleProcessingException(String processName, String messageKey, Object messageValue, Exception exception) {

        final DocumentedErrorOutput documentedErrorOutput = new DocumentedErrorOutput(messageKey, messageValue, exception);
        final Message<DocumentedErrorOutput> documentedErrorOutputMessage = MessageBuilder.withPayload(documentedErrorOutput).build();
        final String bindingName = processName + "-out-error";
        LOGGER.error(">>> EXCEPTION in process {} for message={}! Sending problem to out binding \"{}\".", processName, messageValue, bindingName, exception);
        streamBridge.send(bindingName, documentedErrorOutputMessage);
    }

    // Some samples for partitioning

    @Bean
    public StreamPartitioner<String, Message2> streamPartitionerTopic2() {

        LOGGER.info("Performing StreamPartitioner setup for topic2/Message2 using EventProcessor.streamPartitioner");
        return (topicName, key, value, totalPartitions) -> {
            final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
            LOGGER.info(">>> streamPartitionerTopic2 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
            return partition;
        };
    }

    @Bean
    public StreamPartitioner<String, Message3> streamPartitionerTopic3() {

        LOGGER.info("Performing StreamPartitioner setup for topic3/Message3 using EventProcessor.streamPartitioner");
        return (topicName, key, value, totalPartitions) -> {
            final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
            LOGGER.info(">>> streamPartitionerTopic3 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
            return partition;
        };
    }
}

更新:

处理器的示例代码,乘以一个值

@Component
public class Processor2 {

    public KeyValue<String, Message3> streamProcess(String key, Message2 message2) {

        Message3 message3 = Message3.builder()
            .requestId(message2.getRequestId())
            .startTime(message2.getStartTime())
            .calculatedValue2(message2.getCalculatedValue1() * 2)
            .build();
        return new KeyValue<>(key, message3);
    }
}