如何让 kafka 在 spring xd runtime 上使用 http 流数据?
how to get kafka to consume http streaming data on spring xd runtime?
是否可以让 kafka 源模块在 spring xd 运行时作为处理器模块工作?有代码示例吗?
我正在努力实现这样的目标:http(xd 源)| kafka源码(xd处理器)| kafka 消费者(xd 接收器)
我正在尝试这样做,因为我有流数据通过 http 传输,我想用 kafka 消息总线进行管理。
我的流定义是这样的:
stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy
在 spring xd 的处理器模块中开箱即用的 kafka 源模块实现会导致如下错误:
2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message
org.springframework.messaging.MessageDeliveryException: Dispatcher 没有频道 'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0' 的订阅者。嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher 没有订阅者
在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.
如果您使用 kafka 作为消息总线(设置传输后),那么像 "http | log" 这样的流将使 http 消息流经 kafka 消息总线。在这种情况下,Kafka 代理中的主题将由 XD 内部定义。
Is it possible to have kafka source module work as a processor module in spring xd runtime?
不可以,源模块不能充当处理器模块。如果您希望消息流经 Kafka 中的特定主题,那么您可以拥有一个流,该流具有一个从 http 源接收数据的 kafka 接收器模块,另一个流使用 same[=23= 配置 kafka 源模块] 主题。
并且,这可以通过以下方式实现:
stream create KafkaSink --definition "http --outputType=application/json | kafka --brokerList= --topic=kafkaTestTopic" --deploy
stream create KafkaSource --definition "kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log" --deploy
是否可以让 kafka 源模块在 spring xd 运行时作为处理器模块工作?有代码示例吗?
我正在努力实现这样的目标:http(xd 源)| kafka源码(xd处理器)| kafka 消费者(xd 接收器)
我正在尝试这样做,因为我有流数据通过 http 传输,我想用 kafka 消息总线进行管理。
我的流定义是这样的:
stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy
在 spring xd 的处理器模块中开箱即用的 kafka 源模块实现会导致如下错误:
2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message
org.springframework.messaging.MessageDeliveryException: Dispatcher 没有频道 'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0' 的订阅者。嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher 没有订阅者 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.
如果您使用 kafka 作为消息总线(设置传输后),那么像 "http | log" 这样的流将使 http 消息流经 kafka 消息总线。在这种情况下,Kafka 代理中的主题将由 XD 内部定义。
Is it possible to have kafka source module work as a processor module in spring xd runtime?
不可以,源模块不能充当处理器模块。如果您希望消息流经 Kafka 中的特定主题,那么您可以拥有一个流,该流具有一个从 http 源接收数据的 kafka 接收器模块,另一个流使用 same[=23= 配置 kafka 源模块] 主题。
并且,这可以通过以下方式实现:
stream create KafkaSink --definition "http --outputType=application/json | kafka --brokerList= --topic=kafkaTestTopic" --deploy
stream create KafkaSource --definition "kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log" --deploy