如何将消息从 kafka-sink 路由到多个主题
How to route messages from kafka-sink to multiple topics
我有一个 spring-xd http-processor 模块 http-outbound-gateway 有一个 errorChannel 和 outputChannel。任何带有 HTTP 200 的消息都会进入 outputChannel,其余消息会进入 failureChannel。
现在,http-processor 模块通过 TopicX 连接到 Kafka-Sink 和 kafka-outbound-adapter。 TopicX 仅接收 HTTP 200 消息以供进一步处理。现在,我们需要将 failureChannel 中的消息路由到 TopicY。
如何在 kafka-sink 中发送多个 kafka 主题的消息。我在消息 header 中有 httpStatusCode。我项目中使用的Kafka版本是0.8.2,java版本是1.7
<!-- http-processor-config -->
<int-http:outbound-gateway
request-channel="input"
url-expression="'myUrlLink'"
http-method="POST"
expected-response-type="java.lang.String"
charset="UTF-8"
reply-timeout="10"
reply-channel="output">
<int-http:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="errorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>
在 Kafka Sink 上,我有以下生产者上下文:
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="${topicX}"
key-class-type="java.lang.String"
key-serializer="serializer"
value-class-type="[B"
value-serializer="valueSerializer"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
没错,它不受支持,以后也不会。 Spring XD 今年已经停产了。我们鼓励每个人迁移到 Spring Cloud Data Flow。
对于您的用例,您可以编辑 Kafka Sink 模块配置。为另一个主题再添加一个 <int-kafka:outbound-channel-adapter>
。要决定将传入消息发送到哪个主题,您可以将 <router>
添加到此配置。
或者只是考虑使用Router Sink
。并且每个消息类型都有两个单独的流,因此每个主题。
我终于让它工作了。现在我找到了 0.8.x 版本的解决方法,方法是在 http-processor 模块中添加一个拆分器,并向消息 header 添加一个 kafka_topic 变量。基于 Http 状态代码,我只是设置了不同的主题。
在 Kafka-sink 上,我添加了另一个 producer-configuration 以及通过 XD 参数设置的新主题名称变量。我想不出任何其他解决方案,因为我在多个流中重复使用 kafka-source 和 kafka-sink 模块。
这个特定的 kafka-sink 将输入发送到另一个 XD 流。因此,添加了一个 header-filter 以在下一个流开始时删除 kafka-source 模块中的 kafka_topic。
查找用于设置目标 kafka 主题的行。这就是关键。
我有一个 spring-xd http-processor 模块 http-outbound-gateway 有一个 errorChannel 和 outputChannel。任何带有 HTTP 200 的消息都会进入 outputChannel,其余消息会进入 failureChannel。
现在,http-processor 模块通过 TopicX 连接到 Kafka-Sink 和 kafka-outbound-adapter。 TopicX 仅接收 HTTP 200 消息以供进一步处理。现在,我们需要将 failureChannel 中的消息路由到 TopicY。
如何在 kafka-sink 中发送多个 kafka 主题的消息。我在消息 header 中有 httpStatusCode。我项目中使用的Kafka版本是0.8.2,java版本是1.7
<!-- http-processor-config -->
<int-http:outbound-gateway
request-channel="input"
url-expression="'myUrlLink'"
http-method="POST"
expected-response-type="java.lang.String"
charset="UTF-8"
reply-timeout="10"
reply-channel="output">
<int-http:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="errorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>
在 Kafka Sink 上,我有以下生产者上下文:
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="${topicX}"
key-class-type="java.lang.String"
key-serializer="serializer"
value-class-type="[B"
value-serializer="valueSerializer"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
没错,它不受支持,以后也不会。 Spring XD 今年已经停产了。我们鼓励每个人迁移到 Spring Cloud Data Flow。
对于您的用例,您可以编辑 Kafka Sink 模块配置。为另一个主题再添加一个 <int-kafka:outbound-channel-adapter>
。要决定将传入消息发送到哪个主题,您可以将 <router>
添加到此配置。
或者只是考虑使用Router Sink
。并且每个消息类型都有两个单独的流,因此每个主题。
我终于让它工作了。现在我找到了 0.8.x 版本的解决方法,方法是在 http-processor 模块中添加一个拆分器,并向消息 header 添加一个 kafka_topic 变量。基于 Http 状态代码,我只是设置了不同的主题。
在 Kafka-sink 上,我添加了另一个 producer-configuration 以及通过 XD 参数设置的新主题名称变量。我想不出任何其他解决方案,因为我在多个流中重复使用 kafka-source 和 kafka-sink 模块。
这个特定的 kafka-sink 将输入发送到另一个 XD 流。因此,添加了一个 header-filter 以在下一个流开始时删除 kafka-source 模块中的 kafka_topic。
查找用于设置目标 kafka 主题的行。这就是关键。