Spring Cloud Stream 中的动态目标从 Azure 事件中心到 Kafka
Dynamic destination in Spring Cloud Stream from Azure Event Hub to Kafka
我正在尝试使用 Spring Cloud Stream 来处理发送到 Azure 事件中心实例的消息。这些消息应该路由到 tenant-specific 在运行时根据消息内容在 Kafka 集群上确定的主题。出于开发目的,我通过 Docker 在本地使用 运行 Kafka。
我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这种情况下所需要的。
但是,使我的解决方案起作用的唯一方法是使用 StreamBridge
。我宁愿使用动态目标 header spring.cloud.stream.sendto.destination
,这样处理器就可以写成 Function<>
而不是 Consumer<>
(它不是一个正确的接收器)。这种方法的主要问题是,由于最终解决方案将使用 Spring Data Flow 进行部署,如果使用 StreamBridge,恐怕在配置流时会遇到麻烦。
继续看代码,这是处理器函数,我去掉了不相关的部分
private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
private static final String TENANT_ID_HEADER = "tenant-id";
@Bean
public Function<Message<String>, Message<String>>
routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
return msg -> {
final String tenantId = "test";
final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
return MessageBuilder.withPayload(msg.getPayload())
.setHeader(STREAM_DESTINATION_HEADER, destination)
.setHeader(TENANT_ID_HEADER, tenantId)
.build();
};
}
这是我的 application.yml
spring:
cloud:
stream:
bindings:
routeMessageToTenantDestination-in-0:
binder: kafka-evthub
destination: gateway-report
group: report-processor
dynamic-destinations:
binders:
kafka-ioc:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: localhost:29092
kafka-evthub:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: xxxxxxxxxxx.servicebus.windows.net:9093
configuration:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
mechanism: PLAIN
security.protocol: SASL_SSL
default-binder: kafka-ioc
我在pom.xml
中的相关依赖项
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这是每次函数触发时我得到的异常
2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:79)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=14=](RetryingMessageListenerAdapter.java:125)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.doSendMessage(FunctionConfiguration.java:604)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.handleMessageInternal(FunctionConfiguration.java:597)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 32 more
我尝试过不同的方法,f.i。手动创建目标主题,使用分配给 header 的相同名称配置显式目标绑定(不是确定的解决方案,仅用于测试),但我不断收到此异常。我也试过提供一个 NewDestinationBindingCallback<>
并且从打印日志可以看出框架进入了方法,但是我仍然得到同样的错误。
将 Spring Cloud Stream 与事件中心集成的另一种方法也会发生这种情况,即库 azure-spring-cloud-stream-binder-eventhubs
.
正如我之前所说,我已经找到了依赖 StreamBridge 的解决方法,但这个解决方案对我来说似乎不太理想,我想了解我遗漏了什么。
编辑:我向前迈出了一小步,并设法通过将 spring 引导启动版本从 2.6.2
降级到 2.4.4
使其工作
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
和设置
<properties>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
而不是 pom.xml 中的 2021.0.0
,如 sobychacko 提供的样本中所见。但是,这似乎是一种倒退,或者我的配置中缺少某些内容以使其适用于最新版本?
不确定究竟是什么导致了您遇到的问题。我刚刚创建了一个基本的 sample app 来演示 sendto.destination
header 并验证了该应用程序是否按预期工作。它是一个连接了两个 Kafka 集群的 multi-binder 应用程序。该函数将从第一个集群中消耗,然后使用 sendto
header,产生输出到第二个集群。将此示例中的 code/config 与您的应用进行比较,看看缺少什么。
我在您共享的堆栈跟踪中看到对 StreamBridge
的引用。但是,当使用 sendto.destination
header 时,它不应经过 StreamBridge
.
我正在尝试使用 Spring Cloud Stream 来处理发送到 Azure 事件中心实例的消息。这些消息应该路由到 tenant-specific 在运行时根据消息内容在 Kafka 集群上确定的主题。出于开发目的,我通过 Docker 在本地使用 运行 Kafka。 我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这种情况下所需要的。
但是,使我的解决方案起作用的唯一方法是使用 StreamBridge
。我宁愿使用动态目标 header spring.cloud.stream.sendto.destination
,这样处理器就可以写成 Function<>
而不是 Consumer<>
(它不是一个正确的接收器)。这种方法的主要问题是,由于最终解决方案将使用 Spring Data Flow 进行部署,如果使用 StreamBridge,恐怕在配置流时会遇到麻烦。
继续看代码,这是处理器函数,我去掉了不相关的部分
private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
private static final String TENANT_ID_HEADER = "tenant-id";
@Bean
public Function<Message<String>, Message<String>>
routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
return msg -> {
final String tenantId = "test";
final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
return MessageBuilder.withPayload(msg.getPayload())
.setHeader(STREAM_DESTINATION_HEADER, destination)
.setHeader(TENANT_ID_HEADER, tenantId)
.build();
};
}
这是我的 application.yml
spring:
cloud:
stream:
bindings:
routeMessageToTenantDestination-in-0:
binder: kafka-evthub
destination: gateway-report
group: report-processor
dynamic-destinations:
binders:
kafka-ioc:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: localhost:29092
kafka-evthub:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: xxxxxxxxxxx.servicebus.windows.net:9093
configuration:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
mechanism: PLAIN
security.protocol: SASL_SSL
default-binder: kafka-ioc
我在pom.xml
中的相关依赖项<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这是每次函数触发时我得到的异常
2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:79)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=14=](RetryingMessageListenerAdapter.java:125)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.doSendMessage(FunctionConfiguration.java:604)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.handleMessageInternal(FunctionConfiguration.java:597)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 32 more
我尝试过不同的方法,f.i。手动创建目标主题,使用分配给 header 的相同名称配置显式目标绑定(不是确定的解决方案,仅用于测试),但我不断收到此异常。我也试过提供一个 NewDestinationBindingCallback<>
并且从打印日志可以看出框架进入了方法,但是我仍然得到同样的错误。
将 Spring Cloud Stream 与事件中心集成的另一种方法也会发生这种情况,即库 azure-spring-cloud-stream-binder-eventhubs
.
正如我之前所说,我已经找到了依赖 StreamBridge 的解决方法,但这个解决方案对我来说似乎不太理想,我想了解我遗漏了什么。
编辑:我向前迈出了一小步,并设法通过将 spring 引导启动版本从 2.6.2
降级到 2.4.4
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
和设置
<properties>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
而不是 pom.xml 中的 2021.0.0
,如 sobychacko 提供的样本中所见。但是,这似乎是一种倒退,或者我的配置中缺少某些内容以使其适用于最新版本?
不确定究竟是什么导致了您遇到的问题。我刚刚创建了一个基本的 sample app 来演示 sendto.destination
header 并验证了该应用程序是否按预期工作。它是一个连接了两个 Kafka 集群的 multi-binder 应用程序。该函数将从第一个集群中消耗,然后使用 sendto
header,产生输出到第二个集群。将此示例中的 code/config 与您的应用进行比较,看看缺少什么。
我在您共享的堆栈跟踪中看到对 StreamBridge
的引用。但是,当使用 sendto.destination
header 时,它不应经过 StreamBridge
.