如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter
How to properly configure a TCP inboundAdapter with QueueChannel and ServiceActivator
我正在尝试配置一个 TCP 套接字,以在不同的消息中接收格式为 name,value
的数据。这些消息平均每秒到达一次,时快时慢。
我能够设置一个工作配置,但我对 Spring 集成中实际发生的事情缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
CSVProcessingService
看起来像这样(缩写):
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
- 在配置的端口上接收消息
- 承受更高的负载而不丢失消息
- 反序列化消息
- 放入排队通道
- (最好也记录错误)
- 队列通道每 50 毫秒轮询一次,消息从队列通道传递到
ObjectToStringTransformer
- 转换后的消息被传递给
CSVProcessingService
进一步处理
我是否正确地实现了所有这些目标,还是因为我误解了 Spring 集成而犯了错误?是否有可能以某种方式组合 Poller
和 @ServiceActivator
?
此外,我无法想象我配置的 IntegrationFlow 实际上是如何 "flows",也许有人可以帮助我更好地理解这一点。
编辑:
Artems 发表评论后,我修改了我的配置。现在看起来像这样:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
我还从 CSVProcessingService#process
方法中删除了 @ServiceActivator
注释。
不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。
您可能会错过这样一个事实,即您不需要中间的 QueueChannel
,因为 AbstractConnectionFactory.processNioSelections()
已经是多线程的,它会安排一个任务从套接字读取消息。所以,你只需要为Tcp.nioServer()
配置一个合适的Executor
。尽管默认情况下它是 Executors.newCachedThreadPool()
。
另一方面,使用内存 QueueChannel
您确实可能会丢失消息,因为它们已经从网络中读取。
当你做 Java DSL 时,你应该考虑在端点上使用 poller()
选项。 @Poller
将在 @ServiceActivator
上工作,如果你在那里有 inputChannel
属性,但在 handle()
中使用相同的属性将覆盖 inputChannel
,所以你的 @Poller
将不会应用。不要混淆 Java DSL 和注解配置!
您的配置中的其他一切都很好。
我正在尝试配置一个 TCP 套接字,以在不同的消息中接收格式为 name,value
的数据。这些消息平均每秒到达一次,时快时慢。
我能够设置一个工作配置,但我对 Spring 集成中实际发生的事情缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
CSVProcessingService
看起来像这样(缩写):
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
- 在配置的端口上接收消息
- 承受更高的负载而不丢失消息
- 反序列化消息
- 放入排队通道
- (最好也记录错误)
- 队列通道每 50 毫秒轮询一次,消息从队列通道传递到
ObjectToStringTransformer
- 转换后的消息被传递给
CSVProcessingService
进一步处理
我是否正确地实现了所有这些目标,还是因为我误解了 Spring 集成而犯了错误?是否有可能以某种方式组合 Poller
和 @ServiceActivator
?
此外,我无法想象我配置的 IntegrationFlow 实际上是如何 "flows",也许有人可以帮助我更好地理解这一点。
编辑:
Artems 发表评论后,我修改了我的配置。现在看起来像这样:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
我还从 CSVProcessingService#process
方法中删除了 @ServiceActivator
注释。
不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。
您可能会错过这样一个事实,即您不需要中间的 QueueChannel
,因为 AbstractConnectionFactory.processNioSelections()
已经是多线程的,它会安排一个任务从套接字读取消息。所以,你只需要为Tcp.nioServer()
配置一个合适的Executor
。尽管默认情况下它是 Executors.newCachedThreadPool()
。
另一方面,使用内存 QueueChannel
您确实可能会丢失消息,因为它们已经从网络中读取。
当你做 Java DSL 时,你应该考虑在端点上使用 poller()
选项。 @Poller
将在 @ServiceActivator
上工作,如果你在那里有 inputChannel
属性,但在 handle()
中使用相同的属性将覆盖 inputChannel
,所以你的 @Poller
将不会应用。不要混淆 Java DSL 和注解配置!
您的配置中的其他一切都很好。