spring xd 在处理大量数据时丢失消息

spring xd losing messages when processing huge volume

我正在使用 spring xd 我的流如下所示,运行在 3 个节点容器上进行测试,其中 1 个管理节点使用 rabbit 作为传输

aws-s3|processor1|http-client|processor2>queue:readyQueue

我在点击下面创建了。

tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

我在测试中 运行 以下场景:

Scenario1: 5 个文件 200k = 100 万条记录 http-client=70 和 processor2=30

的并发

我看到 900k 消息 s3Queue

我看到 889k 消息处理器队列 1

我看到 886k 消息 httpQueue

我看到 883k 消息处理器队列 2 消息到处丢失,随机

Scenario2:

5 个文件 200k =100 万条记录和所有模块并发度=1

我看到 998800 条消息 s3Queue

我看到 998760 消息处理器队列 1

我看到 997540 消息 httpQueue

我看到 997530 消息处理器队列 2

即使这个数字是随机的而且不一致

Scenario3

我如下更改了流,concurrency=1 和 200k 的 5 个文件 =100 万条记录

aws-s3 >testQueue

我收到了我所有的消息 运行 3 次,没有 issues.I 收到了我所有的 100 万条消息

scenario4

我如下更改了流,concurrency=1 5 个 200k 的文件 =100 万条记录

aws-s3 |processor1 >testQueue2

我收到了我所有的消息 运行 3 次,没有 issues.I 收到了我所有的 100 万条消息

在场景 4 和场景 3 中,数据摄取速度更快,处理 500 万个数据需要 5 分钟,并且在兔子传输队列中摄取速度更快,例如每秒 5k 条消息

在场景 1 中,即使 s3 模块拉取数据也非常慢,例如每秒 300 到 1000 条消息,数据摄取速度较慢

在场景 2 中,s3 拉取数据的速度更快,但 http 客户端速度很慢,大约每秒 100 条消息,但 aws-s3 拉取数据的速度更快,例如每秒 3-4k 条消息。

我认为 xd 线程导致问题,我正在失去 messages.Please 你能帮我解决这个问题吗?

更新

Scenario 5 

我在 http 客户端中将 reply-timeout 更改为 -1 然后 我只丢失了 37 条消息

现在我又一次 运行 第二次迭代我丢失了 25000 条消息 我看到发生这种情况时的容器日志

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access0(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access00(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~                                                                                                                                                                                 


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

已更新

我发现发生此异常时消息丢失的问题 我看到很多消息 lost.This 模式 我测试了多个 time.Everytime 发生此异常 我看到消息 lost.Also 提高并发性这个问题经常出现。

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

兔子配置

spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: rabbit@host1.test.com,rabbit@host2.test.com,rabbit@host2.test.com
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

随着缓存大小增加到 200 而更新

我添加了你提供的 xml 并将缓存大小增加到 200.This 是处理 100 万和 80k 时发生的方式 messages.Only 我的 http 客户端并发是 100 所有其他是1 .Slowly processing stopped msg 仍然存在于 http-client queue 和相同的 count.But msg count in my named channel slowly increasing like 10 msg per minute 但它非常慢 s3-poller|processor|http-client>queue:batchCacheQueue

在 http 186174.But 消息进入 batchCacheQueue

之前队列中的消息没有减少

要模拟的测试用例:

1)我在复合模块中使用 spring 集成 aws-s3 源和分离器 |处理器像 xml 解析 |http-client with concurrency 100 >named channel.

2)我认为文件源也可能 work.Create 百万条记录的单个文件并尝试从文件中提取它。

3) 在大约 4 到 5 运行 之后,我们看到这个异常发生

Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364

我们发现一个问题,频道被大量搅动;您需要增加兔子缓存连接工厂中的通道缓存大小。

参见

我打开了一个 JIRA issue 以便下一版本的 Spring XD 将在 servers.yml 中公开此设置,因此您不必覆盖总线配置文件。