Spring 集成 MongoDbStoringMessageHandler ClassCastException:BasicDBObject 无法转换为 BasicDBList

Spring Integration MongoDbStoringMessageHandler ClassCastException: BasicDBObject cannot be cast to BasicDBList

我开发了一个集成流程,我从 MongoDbMessageSource 获取用户,对于与用户关联的每个社交媒体,我都获取发给他的评论。

我想在 MongoDbStoringMessageHandler 的帮助下将它们保存在 MongoDB 中,链接到频道 storeChannel.

流程如下:

@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);

    /**
     * The Pollers builder factory can be used to configure common bean definitions or 
     * those created from IntegrationFlowBuilder EIP-methods
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    /**
     * 
     * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
     * which is the result of execution of a Query
     */
    @Bean
    @Autowired
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
        MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
        messageSource.setExpectSingleResult(false);
        messageSource.setEntityClass(UserEntity.class);
        messageSource.setCollectionNameExpression(new LiteralExpression("users"));
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "storeChannel")
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
        MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
        adapter.setCollectionNameExpression(new LiteralExpression("comments"));
        return adapter;
    }

    @Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
                        -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
                )
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
                    }
                })
                .channel("directChannel_1")
                .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
                    }
                })
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
                        m
                        -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST FACEBOOK Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
                                    });
                                }
                            }))
                            .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST YOUTUBE Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
                                    });
                                }
                            }))
                            .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST INSTAGRAM Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
                                    });
                                }
                            }))
                )
                .channel("directChannel_2")
                .aggregate()
                .channel("directChannel_3")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .aggregate()
                .channel("directChannel_4")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .channel("storeChannel")
                .get();
    }

}

错误前的调试信息是:

2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel    : preSend on channel 'storeChannel', message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler   : mongodbAdapter received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]

很明显频道 "storeChannel" 出现了 "CommentEntity"

的列表
@Document(collection="comments")
public class CommentEntity {

    @Id
    private ObjectId id;

    @Field("message")
    private String message;

    private ObjectId user;

    @PersistenceConstructor
    public CommentEntity(String message, ObjectId user) {
        this.message = message;
        this.user = user;
    }

    public ObjectId getId() {
        return id;
    }

    public void setId(ObjectId id) {
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public ObjectId getUser() {
        return user;
    }

    public void setUser(ObjectId user) {
        this.user = user;
    }

}

然后发生此异常:

2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]

我目前使用的是嵌入式 MongoDB:

<dependency>
  <groupId>de.flapdoodle.embed</groupId>
  <artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>

有谁知道我做错了吗?提前致谢。

嗯,那个异常清楚地表明 MongoDbStoringMessageHandler 不支持 collection 保存:

protected void handleMessageInternal(Message<?> message) throws Exception {
    Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
    Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");

    Object payload = message.getPayload();

    this.mongoTemplate.save(payload, collectionName);
}

您无需 .aggregate() 构建 collection 即可保存。你只能用那个组件一个一个保存。

我认为这应该是让该组件执行的一个很好的补充:

/**
 * Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
 * class.
 *
 * @param collectionToSave the list of objects to save.
 */
void insertAll(Collection<? extends Object> objectsToSave);

请就此事提出一个 JIRA,不要犹豫贡献!