spring-xd 中是否有 API 来写入消息总线?

Is there an API in spring-xd to write to a message bus?

考虑到可以使用 spring-xd 部署任何消息总线,是否有 spring-xd 提供的任何 API 可以写入部署的消息总线,比如说,Kafka/rabbitmq 基于 xd/config/servers.yml 或任何其他类似地方的配置。

我正在编写一个处理器(itemprocessor),它对数据进行一些处理,编写器暂时将数据写入 rabbitmq 队列。因为,在当前的部署场景中,rabbitmq 可能部署也可能不部署,处理器应该能够写入默认的 Redis 消息 bus.I 知道我可以使用 spring-rabbit 提供的 api 写入 rabbitmq ,但这会将我的处理器绑定到 RabbitMQ。我正在寻找一种方法来概括它。我试图查看 spring-xd 代码,看看是否有这样的例子。我找到了一个 MessageProcessor 示例,但这个是一个流处理器,不确定我如何应用它或者我是否在正确的轨道上。

https://github.com/spring-projects/spring-xd/blob/master/spring-xd-rxjava/src/test/java/org/springframework/xd/rxjava/PongMessageProcessor.java

我刚开始使用 spring-xd,所以如果已经讨论过了,请原谅我的无知。非常感谢任何指点。

更新

谢谢 Gary,根据您的回答,我尝试了 spring-integration jms 示例。

我有一个 spring 批处理作业

<batch:chunk reader="reader" processor="processor" writer="writer" /> 

我希望将编写器的输出写入任何底层消息总线,首先是 RabiitMQ。所以我根据我在示例中看到的内容添加了以下内容:

 <beans:bean id="writer" class="abc" scope="step">
</beans:bean>

<channel id="outputme"/>

<beans:bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <beans:constructor-arg value="queue.demo"/>
</beans:bean>

<beans:bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <beans:constructor-arg value="queue.reply"/>
</beans:bean>

<jms:outbound-gateway request-channel="outputme"
                      request-destination="requestQueue"
                      reply-channel="jmsReplyChannel"/>

<channel id="jmsReplyChannel" />

<beans:beans profile="default">

    <stream:stdout-channel-adapter channel="jmsReplyChannel" append-newline="true"/>

</beans:beans>

当我执行此操作时,我看到以下输出,这使我相信某些内容正在写入嵌入式 ActiveMQ 代理。

16:05:42,400 [AbstractApplicationContext] - Closing org.springframework.context.support.GenericApplicationContext@125a6d70: startup date [Tue Feb 03 16:05:40 PST 2015]; root of context hierarchy
16:05:42,401 [DefaultLifecycleProcessor$LifecycleGroup] - Stopping beans in phase 0
16:05:42,402 [EventDrivenConsumer] - Removing {jms:outbound-gateway} as a subscriber to the 'outputme' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.outputme' has 0 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:05:42,402 [EventDrivenConsumer] - Removing {service-activator} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,402 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 1 subscriber(s).
16:05:42,402 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
16:05:42,402 [EventDrivenConsumer] - Removing {stream:outbound-channel-adapter(character)} as a subscriber to the 'jmsReplyChannel' channel
16:05:42,403 [AbstractSubscribableChannel] - Channel 'org.springframework.context.support.GenericApplicationContext@125a6d70.jmsReplyChannel' has 0 subscriber(s).
16:05:42,403 [AbstractEndpoint] - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#2

但是,当我尝试通过像这样更改 connectionfactory 来使用 RabbitMQ 更改 ActiveMQ 时:

<rabbit:connection-factory id="connectionFactory" />

我收到一条错误消息:

 Cannot convert value of type [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] to required type [javax.jms.ConnectionFactory] for property 'connectionFactory'

我根据 http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 处的架构文件中提到的内容理解这一点,因为元素连接工厂所需的类型是 javax.jms.ConnectionFactory。我环顾四周,没有找到一种方法来为 RabbitMQ 创建 conenctionFactory,就像我们为 ActiveMQ 创建 ConnectionFactory 一样。

我环顾四周,不确定如何解决这个问题。也许我错过了一些非常基本的东西。我什至不确定这是否是正确的方法。你能告诉我我错过了什么吗?这是否是正确的方法?如果已经讨论过了,我提前道歉。

再次感谢您的宝贵时间。


非常感谢您的宝贵时间。

问候,

爱丽丝

MessageBus SPI 非常适合 XD 模块内通信;它不是为任意应用程序级消息传递而设计的。

也就是说,XD(及其消息总线实现)广泛使用 Spring Integration project

该项目提供了您需要的抽象。您可以发送到消息通道(使用 MessagingGatewayMessagingTemplate,并且在该通道的下游,您可以连接任何类型的通道适配器(rabbit [amqp]、redis 等)。

因此您的项目处理器与接收消息的实际技术分离。

查看 Spring 集成参考手册(项目页面上有 link)。