spring-xd 中是否有 API 来写入消息总线?
Is there an API in spring-xd to write to a message bus?
考虑到可以使用 spring-xd 部署任何消息总线,是否有 spring-xd 提供的任何 API 可以写入部署的消息总线,比如说,Kafka/rabbitmq 基于 xd/config/servers.yml 或任何其他类似地方的配置。
我正在编写一个处理器(itemprocessor),它对数据进行一些处理,编写器暂时将数据写入 rabbitmq 队列。因为,在当前的部署场景中,rabbitmq 可能部署也可能不部署,处理器应该能够写入默认的 Redis 消息 bus.I 知道我可以使用 spring-rabbit 提供的 api 写入 rabbitmq ,但这会将我的处理器绑定到 RabbitMQ。我正在寻找一种方法来概括它。我试图查看 spring-xd 代码,看看是否有这样的例子。我找到了一个 MessageProcessor 示例,但这个是一个流处理器,不确定我如何应用它或者我是否在正确的轨道上。
我刚开始使用 spring-xd,所以如果已经讨论过了,请原谅我的无知。非常感谢任何指点。
更新
谢谢 Gary,根据您的回答,我尝试了 spring-integration jms 示例。
我有一个 spring 批处理作业
<batch:chunk reader="reader" processor="processor" writer="writer" />
我希望将编写器的输出写入任何底层消息总线,首先是 RabiitMQ。所以我根据我在示例中看到的内容添加了以下内容:
<beans:bean id="writer" class="abc" scope="step">
</beans:bean>
<channel id="outputme"/>
<beans:bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.demo"/>
</beans:bean>
<beans:bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.reply"/>
</beans:bean>
<jms:outbound-gateway request-channel="outputme"
request-destination="requestQueue"
reply-channel="jmsReplyChannel"/>
<channel id="jmsReplyChannel" />
<beans:beans profile="default">
<stream:stdout-channel-adapter channel="jmsReplyChannel" append-newline="true"/>
</beans:beans>
当我执行此操作时,我看到以下输出,这使我相信某些内容正在写入嵌入式 ActiveMQ 代理。
16:05:42,400 [AbstractApplicationContext] - Closing org.springframework.context.support.GenericApplicationContext@125a6d70: startup date [Tue Feb 03 16:05:40 PST 2015]; root of context hierarchy
16:05:42,401 [DefaultLifecycleProcessor$LifecycleGroup] - Stopping beans in phase 0
16:05:42,402 [EventDrivenConsumer] - Removing {jms:outbound-gateway} as a subscriber to the 'outputme' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.outputme' has 0 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:05:42,402 [EventDrivenConsumer] - Removing {service-activator} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 1 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
16:05:42,402 [EventDrivenConsumer] - Removing {stream:outbound-channel-adapter(character)} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,403 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 0 subscriber(s).
16:05:42,403 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#2
但是,当我尝试通过像这样更改 connectionfactory 来使用 RabbitMQ 更改 ActiveMQ 时:
<rabbit:connection-factory id="connectionFactory" />
我收到一条错误消息:
Cannot convert value of type [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] to required type [javax.jms.ConnectionFactory] for property 'connectionFactory'
我根据 http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 处的架构文件中提到的内容理解这一点,因为元素连接工厂所需的类型是 javax.jms.ConnectionFactory。我环顾四周,没有找到一种方法来为 RabbitMQ 创建 conenctionFactory,就像我们为 ActiveMQ 创建 ConnectionFactory 一样。
我环顾四周,不确定如何解决这个问题。也许我错过了一些非常基本的东西。我什至不确定这是否是正确的方法。你能告诉我我错过了什么吗?这是否是正确的方法?如果已经讨论过了,我提前道歉。
再次感谢您的宝贵时间。
非常感谢您的宝贵时间。
问候,
爱丽丝
MessageBus
SPI 非常适合 XD 模块内通信;它不是为任意应用程序级消息传递而设计的。
也就是说,XD(及其消息总线实现)广泛使用 Spring Integration project。
该项目提供了您需要的抽象。您可以发送到消息通道(使用 MessagingGateway
或 MessagingTemplate
,并且在该通道的下游,您可以连接任何类型的通道适配器(rabbit [amqp]、redis 等)。
因此您的项目处理器与接收消息的实际技术分离。
查看 Spring 集成参考手册(项目页面上有 link)。
考虑到可以使用 spring-xd 部署任何消息总线,是否有 spring-xd 提供的任何 API 可以写入部署的消息总线,比如说,Kafka/rabbitmq 基于 xd/config/servers.yml 或任何其他类似地方的配置。
我正在编写一个处理器(itemprocessor),它对数据进行一些处理,编写器暂时将数据写入 rabbitmq 队列。因为,在当前的部署场景中,rabbitmq 可能部署也可能不部署,处理器应该能够写入默认的 Redis 消息 bus.I 知道我可以使用 spring-rabbit 提供的 api 写入 rabbitmq ,但这会将我的处理器绑定到 RabbitMQ。我正在寻找一种方法来概括它。我试图查看 spring-xd 代码,看看是否有这样的例子。我找到了一个 MessageProcessor 示例,但这个是一个流处理器,不确定我如何应用它或者我是否在正确的轨道上。
我刚开始使用 spring-xd,所以如果已经讨论过了,请原谅我的无知。非常感谢任何指点。
更新
谢谢 Gary,根据您的回答,我尝试了 spring-integration jms 示例。
我有一个 spring 批处理作业
<batch:chunk reader="reader" processor="processor" writer="writer" />
我希望将编写器的输出写入任何底层消息总线,首先是 RabiitMQ。所以我根据我在示例中看到的内容添加了以下内容:
<beans:bean id="writer" class="abc" scope="step">
</beans:bean>
<channel id="outputme"/>
<beans:bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.demo"/>
</beans:bean>
<beans:bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<beans:constructor-arg value="queue.reply"/>
</beans:bean>
<jms:outbound-gateway request-channel="outputme"
request-destination="requestQueue"
reply-channel="jmsReplyChannel"/>
<channel id="jmsReplyChannel" />
<beans:beans profile="default">
<stream:stdout-channel-adapter channel="jmsReplyChannel" append-newline="true"/>
</beans:beans>
当我执行此操作时,我看到以下输出,这使我相信某些内容正在写入嵌入式 ActiveMQ 代理。
16:05:42,400 [AbstractApplicationContext] - Closing org.springframework.context.support.GenericApplicationContext@125a6d70: startup date [Tue Feb 03 16:05:40 PST 2015]; root of context hierarchy
16:05:42,401 [DefaultLifecycleProcessor$LifecycleGroup] - Stopping beans in phase 0
16:05:42,402 [EventDrivenConsumer] - Removing {jms:outbound-gateway} as a subscriber to the 'outputme' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.outputme' has 0 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:05:42,402 [EventDrivenConsumer] - Removing {service-activator} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 1 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
16:05:42,402 [EventDrivenConsumer] - Removing {stream:outbound-channel-adapter(character)} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,403 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 0 subscriber(s).
16:05:42,403 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#2
但是,当我尝试通过像这样更改 connectionfactory 来使用 RabbitMQ 更改 ActiveMQ 时:
<rabbit:connection-factory id="connectionFactory" />
我收到一条错误消息:
Cannot convert value of type [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] to required type [javax.jms.ConnectionFactory] for property 'connectionFactory'
我根据 http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 处的架构文件中提到的内容理解这一点,因为元素连接工厂所需的类型是 javax.jms.ConnectionFactory。我环顾四周,没有找到一种方法来为 RabbitMQ 创建 conenctionFactory,就像我们为 ActiveMQ 创建 ConnectionFactory 一样。
我环顾四周,不确定如何解决这个问题。也许我错过了一些非常基本的东西。我什至不确定这是否是正确的方法。你能告诉我我错过了什么吗?这是否是正确的方法?如果已经讨论过了,我提前道歉。
再次感谢您的宝贵时间。
非常感谢您的宝贵时间。
问候,
爱丽丝
MessageBus
SPI 非常适合 XD 模块内通信;它不是为任意应用程序级消息传递而设计的。
也就是说,XD(及其消息总线实现)广泛使用 Spring Integration project。
该项目提供了您需要的抽象。您可以发送到消息通道(使用 MessagingGateway
或 MessagingTemplate
,并且在该通道的下游,您可以连接任何类型的通道适配器(rabbit [amqp]、redis 等)。
因此您的项目处理器与接收消息的实际技术分离。
查看 Spring 集成参考手册(项目页面上有 link)。