使用 Spring 集成 DSL 邮件时出现 ConcurrentModificationException

ConcurrentModificationException while using Spring Integration DSL Mail

我正在尝试使用 Spring 集成 Java DSL 来接收电子邮件 (gmail)。我也在使用 Redis。当我收到邮件时,我得到了 ConcurrentModificationException 。在搜索时,我还发现人们在 Kafka 中也遇到了同样的错误。下面是我的代码和异常。

感谢您的帮助。

    Properties javaMailProperties = new Properties();
    javaMailProperties.setProperty("mail.imap.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
    javaMailProperties.setProperty("mail.imap.socketFactory.fallback", "false");
    javaMailProperties.setProperty("mail.store.protocol", "imaps");
    javaMailProperties.setProperty("mail.debug", "false");
    String imapUrl = "imaps://email%40gmail.com:password@imap.gmail.com:993/INBOX";



    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(Mail.imapInboundAdapter(imapUrl).javaMailProperties(javaMailProperties).shouldMarkMessagesAsRead(true)
            , new Consumer<SourcePollingChannelAdapterSpec>() {


        @Override
        public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
            sourcePollingChannelAdapterSpec
                    .poller(defaultPoller);
        }
    });
    return flowBuilder.channel(source.output()).get();

异常:

2016-04-16 09:19:51.449  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:31.463 ERROR 11640 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler@2f104438]; nested exception is com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:57)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:176)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:173)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:330)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534)
    at org.springframework.integration.codec.kryo.PojoCodec.doEncode(PojoCodec.java:92)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec.execute(AbstractKryoCodec.java:66)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:61)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec.encode(AbstractKryoCodec.java:63)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:230)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:210)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.access0(RedisMessageChannelBinder.java:71)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler.handleMessageInternal(RedisMessageChannelBinder.java:295)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    ... 28 more
Caused by: java.util.ConcurrentModificationException
    at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
    at java.util.Vector$Itr.next(Vector.java:1133)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:92)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    ... 84 more

2016-04-16 09:21:36.188  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:39.104  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]

看来我们需要为 MimeMessage 定制一个 kryo 序列化程序(由 Spring Cloud Stream 使用)。我开了一个JIRA Issue反对Spring整合

但是我测试的时候出现了栈溢出错误,不是你看到的异常。

如果您可以将 mail.debug javamail 属性 设置为 true 的控制台输出附加到 JIRA 问题,并附上一条显示您的问题的消息,那将会很有帮助,所以我们可以确定我们也解决了这个问题,因为这样我们就可以重建一个类似的 MimeMessage.

编辑

初步分析显示问题是在 MIME 消息的 session 字段中序列化 java.util.Logger

此外,事实证明这个 object (AbstractMailReceiver.IntegrationMimeMessage) 根本不适合序列化,因为它在其范围内具有整个应用程序上下文,因为它是一个嵌套的 class邮件接收者。

因此,我建议在邮件模块中添加一个转换器,将此 object 转换为适合任何类型序列化的转换器。

EDIT2

一种解决方法是将 .transform(mailToStringTransformer()) 添加到流程中。邮件内容将是邮件的body,发件人、密件抄送、抄送、收件人、回复和主题将添加到headers。