使用 MongoDB Sink 部署流时,我得到一个 MappingException "Couldn't find PersistentEntity for type class [B!"

When deploying a stream with MongoDB Sink I got a MappingException "Couldn't find PersistentEntity for type class [B!"

当我尝试使用 MongoDB Sink app starter 以及 JDBC 来源读取的信息时,我遇到了这个问题:

MappingException: Couldn't find PersistentEntity for type class [B!

Class[B!表示这是一个byte[]。我在日志中打印,这是对象的 JSON。这是一个错误吗?其他水槽运行良好。

我在本地部署中使用 Spring Cloud Data Flow 1.6.3。 应用启动器在 Darwin.SR1 版本。

这是 流定义:

jdbc --password='mypass' --query='SELECT id, name FROM mytable WHERE imported = false' --max-rows-per-poll=1000 --update='UPDATE mytable SET imported = true WHERE id in (:id)' --fixed-delay=30 --time-unit=SECONDS --driver-class-name=org.postgresql.Driver --url=jdbc:postgresql://localhost:5432/mydatabase --username=postgres | mongodb --database=main --port=27017 --host=localhost --collection=mycollection --uri=mongodb://localhost/main

完整堆栈跟踪:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'SystemOutAndSink.systemOutProcessor.SystemOutAndSink.errors'; nested exception is org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:98) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.support.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:164) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    ... 8 common frames omitted
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.handleMessage(RabbitMessageChannelBinder.java:513) ~[spring-cloud-stream-binder-rabbit-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 21 common frames omitted
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongoDbSinkMessageHandler]; nested exception is org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!, failedMessage=GenericMessage [payload=byte[26], headers={amqp_receivedDeliveryMode=PERSISTENT, sequenceNumber=1, amqp_receivedExchange=SystemOutAndSink.systemOutProcessor, amqp_deliveryTag=1, sequenceSize=1, deliveryAttempt=3, amqp_consumerQueue=SystemOutAndSink.systemOutProcessor.SystemOutAndSink, amqp_redelivered=false, amqp_receivedRoutingKey=SystemOutAndSink.systemOutProcessor, correlationId=33566e38-0f55-25be-40d6-5ef68ca496b2, id=e636d851-dc51-dbc3-99d6-3705ef71db59, amqp_consumerTag=amq.ctag-4DTfbrWGYdijPJZnyJzGpQ, contentType=application/json, timestamp=1538314765609}]
    ... 27 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongoDbSinkMessageHandler]; nested exception is org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:47) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access00(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage[=14=](AmqpInboundChannelAdapter.java:214) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    ... 11 common frames omitted
Caused by: org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!
    at org.springframework.data.mapping.context.MappingContext.getRequiredPersistentEntity(MappingContext.java:76) ~[spring-data-commons-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:435) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.write(MappingMongoConverter.java:391) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.write(MappingMongoConverter.java:86) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.toDocument(MongoTemplate.java:1070) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.doSave(MongoTemplate.java:1253) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1201) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.integration.mongodb.outbound.MongoDbStoringMessageHandler.handleMessageInternal(MongoDbStoringMessageHandler.java:124) ~[spring-integration-mongodb-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 28 common frames omitted

只要MongoTemplate.save()(本质上是mappingContext.getPersistentEntity(type))不支持简单的byte[](或类似的ByteBuffer或只是InputStream),我们别无选择,除非将入站 byte[] 转换为 String 以使其工作。

因此,为此您需要编写一个带有自动配置和 bean 的简单 jar,如下所示:

@Bean
@GlobalChannelInterceptor(patterns = Sink.INPUT)
public ChannelInterceptor bytesToStringChannelInterceptor() {
    return new ChannelInterceptor() {

        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            if (message.getPayload() instanceof byte[]) {
                String contentType = message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)
                        ? message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()
                        : BindingProperties.DEFAULT_CONTENT_TYPE.toString();
                if (contentType.contains("text") ||
                        contentType.contains("json") ||
                        contentType.contains("x-spring-tuple")) {

                    return new MutableMessage<>(new String(((byte[]) message.getPayload())), message.getHeaders());
                }
            }
            return message;

        }
    };
}

强制将 byte[] 入站转换为 MongoTemplate 上支持的 String

随时针对 MongoDB App Starter 提出问题以将此功能包含在框中。