Spring 集成 MongoDbStoringMessageHandler ClassCastException:BasicDBObject 无法转换为 BasicDBList
Spring Integration MongoDbStoringMessageHandler ClassCastException: BasicDBObject cannot be cast to BasicDBList
我开发了一个集成流程,我从 MongoDbMessageSource 获取用户,对于与用户关联的每个社交媒体,我都获取发给他的评论。
我想在 MongoDbStoringMessageHandler 的帮助下将它们保存在 MongoDB 中,链接到频道 storeChannel.
流程如下:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
*
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST FACEBOOK Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST YOUTUBE Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST INSTAGRAM Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
});
}
}))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
}
错误前的调试信息是:
2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
很明显频道 "storeChannel" 出现了 "CommentEntity"
的列表
@Document(collection="comments")
public class CommentEntity {
@Id
private ObjectId id;
@Field("message")
private String message;
private ObjectId user;
@PersistenceConstructor
public CommentEntity(String message, ObjectId user) {
this.message = message;
this.user = user;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public ObjectId getUser() {
return user;
}
public void setUser(ObjectId user) {
this.user = user;
}
}
然后发生此异常:
2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
我目前使用的是嵌入式 MongoDB:
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
有谁知道我做错了吗?提前致谢。
嗯,那个异常清楚地表明 MongoDbStoringMessageHandler
不支持 collection 保存:
protected void handleMessageInternal(Message<?> message) throws Exception {
Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");
Object payload = message.getPayload();
this.mongoTemplate.save(payload, collectionName);
}
您无需 .aggregate()
构建 collection 即可保存。你只能用那个组件一个一个保存。
我认为这应该是让该组件执行的一个很好的补充:
/**
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
* class.
*
* @param collectionToSave the list of objects to save.
*/
void insertAll(Collection<? extends Object> objectsToSave);
请就此事提出一个 JIRA,不要犹豫贡献!
我开发了一个集成流程,我从 MongoDbMessageSource 获取用户,对于与用户关联的每个社交媒体,我都获取发给他的评论。
我想在 MongoDbStoringMessageHandler 的帮助下将它们保存在 MongoDB 中,链接到频道 storeChannel.
流程如下:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
*
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST FACEBOOK Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST YOUTUBE Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST INSTAGRAM Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
});
}
}))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
}
错误前的调试信息是:
2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
很明显频道 "storeChannel" 出现了 "CommentEntity"
的列表@Document(collection="comments")
public class CommentEntity {
@Id
private ObjectId id;
@Field("message")
private String message;
private ObjectId user;
@PersistenceConstructor
public CommentEntity(String message, ObjectId user) {
this.message = message;
this.user = user;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public ObjectId getUser() {
return user;
}
public void setUser(ObjectId user) {
this.user = user;
}
}
然后发生此异常:
2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
我目前使用的是嵌入式 MongoDB:
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
有谁知道我做错了吗?提前致谢。
嗯,那个异常清楚地表明 MongoDbStoringMessageHandler
不支持 collection 保存:
protected void handleMessageInternal(Message<?> message) throws Exception {
Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");
Object payload = message.getPayload();
this.mongoTemplate.save(payload, collectionName);
}
您无需 .aggregate()
构建 collection 即可保存。你只能用那个组件一个一个保存。
我认为这应该是让该组件执行的一个很好的补充:
/**
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
* class.
*
* @param collectionToSave the list of objects to save.
*/
void insertAll(Collection<? extends Object> objectsToSave);
请就此事提出一个 JIRA,不要犹豫贡献!