Spring Cloud Stream 中的动态目标从 Azure 事件中心到 Kafka

Dynamic destination in Spring Cloud Stream from Azure Event Hub to Kafka

我正在尝试使用 Spring Cloud Stream 来处理发送到 Azure 事件中心实例的消息。这些消息应该路由到 tenant-specific 在运行时根据消息内容在 Kafka 集群上确定的主题。出于开发目的,我通过 Docker 在本地使用 运行 Kafka。 我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这种情况下所需要的。

但是,使我的解决方案起作用的唯一方法是使用 StreamBridge。我宁愿使用动态目标 header spring.cloud.stream.sendto.destination,这样处理器就可以写成 Function<> 而不是 Consumer<>(它不是一个正确的接收器)。这种方法的主要问题是,由于最终解决方案将使用 Spring Data Flow 进行部署,如果使用 StreamBridge,恐怕在配置流时会遇到麻烦。

继续看代码,这是处理器函数,我去掉了不相关的部分

    private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
    private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
    private static final String TENANT_ID_HEADER = "tenant-id";

    @Bean
    public Function<Message<String>, Message<String>>
    routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
        return msg -> {
            final String tenantId = "test";
            final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
            return MessageBuilder.withPayload(msg.getPayload())
                    .setHeader(STREAM_DESTINATION_HEADER, destination)
                    .setHeader(TENANT_ID_HEADER, tenantId)
                    .build();
        };
    }

这是我的 application.yml

spring:
  cloud:
    stream:
      bindings:
        routeMessageToTenantDestination-in-0:
          binder: kafka-evthub
          destination: gateway-report
          group: report-processor
      dynamic-destinations:
      binders:
        kafka-ioc:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: localhost:29092
        kafka-evthub:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: xxxxxxxxxxx.servicebus.windows.net:9093
              configuration:
                sasl:
                  jaas:
                    config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
                  mechanism: PLAIN
                security.protocol: SASL_SSL
      default-binder: kafka-ioc

我在pom.xml

中的相关依赖项
<dependency>
<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这是每次函数触发时我得到的异常

2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:79)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=14=](RetryingMessageListenerAdapter.java:125)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.doSendMessage(FunctionConfiguration.java:604)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.handleMessageInternal(FunctionConfiguration.java:597)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 32 more

我尝试过不同的方法,f.i。手动创建目标主题,使用分配给 header 的相同名称配置显式目标绑定(不是确定的解决方案,仅用于测试),但我不断收到此异常。我也试过提供一个 NewDestinationBindingCallback<> 并且从打印日志可以看出框架进入了方法,但是我仍然得到同样的错误。

将 Spring Cloud Stream 与事件中心集成的另一种方法也会发生这种情况,即库 azure-spring-cloud-stream-binder-eventhubs.

正如我之前所说,我已经找到了依赖 StreamBridge 的解决方法,但这个解决方案对我来说似乎不太理想,我想了解我遗漏了什么。

编辑:我向前迈出了一小步,并设法通过将 spring 引导启动版本从 2.6.2 降级到 2.4.4

使其工作
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

和设置

    <properties>
        <spring-cloud.version>2020.0.2</spring-cloud.version>
    </properties>

而不是 pom.xml 中的 2021.0.0,如 sobychacko 提供的样本中所见。但是,这似乎是一种倒退,或者我的配置中缺少某些内容以使其适用于最新版本?

不确定究竟是什么导致了您遇到的问题。我刚刚创建了一个基本的 sample app 来演示 sendto.destination header 并验证了该应用程序是否按预期工作。它是一个连接了两个 Kafka 集群的 multi-binder 应用程序。该函数将从第一个集群中消耗,然后使用 sendto header,产生输出到第二个集群。将此示例中的 code/config 与您的应用进行比较,看看缺少什么。

我在您共享的堆栈跟踪中看到对 StreamBridge 的引用。但是,当使用 sendto.destination header 时,它不应经过 StreamBridge.