动态生成具有入站通道和响应通道的 TCP 客户端
Dynamic generation of TCP client with inbound channel and respond channel
我是 Spring 集成的新手。
使用 Spring 4,只有 java 个注释。
我现在工作的项目我们在 属性 文件中设置了 tcp 连接。
目前它被硬编码为只有 2 个不同的连接,必须将其更改为更动态的方法,我们可以在 属性 文件中设置可变数量的连接并能够添加新的运行时的。
我知道 dynamic tcp client example 的存在,并试图将我的工作建立在它的基础上。
首先我们为连接设置以下 bean:
@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
env.getProperty("socket.tcp.nodes[0].ip"),
env.getProperty("socket.tcp.nodes[0].port", Integer.class)
);
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
tcpNetClientConnectionFactory.setSerializer(by);
tcpNetClientConnectionFactory.setDeserializer(by);
return tcpNetClientConnectionFactory;
}
然后我们有等待发送的适配器:
@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
@Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
return adapter;
}
调用 fromTcp() 时,它会转换消息,以下代码会将其发送到另一个应用程序以进行进一步处理。
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
处理消息后,我们必须发送响应。
@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
return tcpSendingMsgHandler;
}
并使用网关:
@MessagingGateway()
public interface MessageTcpGateway {
@Gateway(requestChannel = "toTcpCh01")
ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}
我们将其寄回。
通过示例,我可以了解如何为响应动态创建流。
但我无法理解如何创建公共连接池,然后基于这些连接工厂动态创建侦听适配器和响应适配器,然后在运行时 closing/removing 它们。
感谢
,我有点了解如何使用入站适配器创建流
我是否需要为每个适配器创建多个单独的 IntegrationFlow?所以所有的调用和响应都可以异步处理(我对异步的理解可能是错误的)
然后在要关闭连接时分别处理它们?比如调用 TcpReceivingChannelAdapter,然后调用 TcpSendingMessageHandler,最后注销 connectonfactory?
我不认为 Collaborating Channel Adapters 需要 TcpReceivingChannelAdapter
和 TcpSendingMessageHandler
的单独 IntegrationFlow
定义。它确实可以作为一个 IntegrationFlow
从 TcpReceivingChannelAdapter
开始并以 TcpSendingMessageHandler
结束。关键是 IntegrationFlow
本身只是一个对组件引用进行分组的逻辑容器。艰苦的工作实际上是由您在那里声明的所有那些组件完成的,并且有了这个 TcpReceivingChannelAdapter
到 TcpSendingMessageHandler
和中间的网关,您真的会异步。
请记住 ByteArrayLengthHeaderSerializer
也必须声明为 bean。不确定每个动态流是否需要一个单独的实例,但这里有一个 API 可以从那里做到这一点:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);
我是 Spring 集成的新手。
使用 Spring 4,只有 java 个注释。
我现在工作的项目我们在 属性 文件中设置了 tcp 连接。
目前它被硬编码为只有 2 个不同的连接,必须将其更改为更动态的方法,我们可以在 属性 文件中设置可变数量的连接并能够添加新的运行时的。
我知道 dynamic tcp client example 的存在,并试图将我的工作建立在它的基础上。
首先我们为连接设置以下 bean:
@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
env.getProperty("socket.tcp.nodes[0].ip"),
env.getProperty("socket.tcp.nodes[0].port", Integer.class)
);
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
tcpNetClientConnectionFactory.setSerializer(by);
tcpNetClientConnectionFactory.setDeserializer(by);
return tcpNetClientConnectionFactory;
}
然后我们有等待发送的适配器:
@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
@Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
return adapter;
}
调用 fromTcp() 时,它会转换消息,以下代码会将其发送到另一个应用程序以进行进一步处理。
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
处理消息后,我们必须发送响应。
@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
return tcpSendingMsgHandler;
}
并使用网关:
@MessagingGateway()
public interface MessageTcpGateway {
@Gateway(requestChannel = "toTcpCh01")
ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}
我们将其寄回。
通过示例,我可以了解如何为响应动态创建流。
但我无法理解如何创建公共连接池,然后基于这些连接工厂动态创建侦听适配器和响应适配器,然后在运行时 closing/removing 它们。
感谢
我是否需要为每个适配器创建多个单独的 IntegrationFlow?所以所有的调用和响应都可以异步处理(我对异步的理解可能是错误的)
然后在要关闭连接时分别处理它们?比如调用 TcpReceivingChannelAdapter,然后调用 TcpSendingMessageHandler,最后注销 connectonfactory?
我不认为 Collaborating Channel Adapters 需要 TcpReceivingChannelAdapter
和 TcpSendingMessageHandler
的单独 IntegrationFlow
定义。它确实可以作为一个 IntegrationFlow
从 TcpReceivingChannelAdapter
开始并以 TcpSendingMessageHandler
结束。关键是 IntegrationFlow
本身只是一个对组件引用进行分组的逻辑容器。艰苦的工作实际上是由您在那里声明的所有那些组件完成的,并且有了这个 TcpReceivingChannelAdapter
到 TcpSendingMessageHandler
和中间的网关,您真的会异步。
请记住 ByteArrayLengthHeaderSerializer
也必须声明为 bean。不确定每个动态流是否需要一个单独的实例,但这里有一个 API 可以从那里做到这一点:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);