Spring 集成:未能在超时内向通道 'executionFilterChannel' 发送消息:-1

Spring integration : Failed to send message to channel 'executionFilterChannel' within timeout: -1

当我尝试使用频道消息存储时出现以下错误:- org.springframework.messaging.MessageDeliveryException: 未能在超时内向通道 'executionFilterChannel' 发送消息:-1

        <int:channel id="executionFilterOutputChannel" />   
        <int:channel id="testChannel" /> 
        <int:channel id="executionFilterChannel">
           <int:queue message-store="channelMessageStore" />
                <int:interceptors>
                    <int:ref bean="kafkaIngestionInterceptor" />
                </int:interceptors>
        </int:channel>
        <bean id="kafkaIngestionInterceptor" class="com.xyz.report.interceptor.KafkaIngestionInterceptor" />

    <bean id="channelMessageStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="queryProvider" />
<property name="tablePrefix" value="QUEUE_" />

        <int:filter id="executionFilters"
                input-channel="testChannel" ref="executionFilter"
                method="productFilter" output-channel="executionFilterOutputChannel"
                discard-channel="exceptionFilterChannel" />

        <int:bridge id="bridgeChannel" input-channel="executionFilterChannel"
                output-channel="testChannel">
                <int:poller fixed-delay="10" max-messages-per-poll="1" >
                    <int:transactional transaction-manager="txManager" />
                </int:poller>
            </int:bridge>
            <bean id="txManager"
                class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
                <property name="dataSource" ref="dataSource" />
            </bean>

    <int:service-activator id="outputKafkaActivator"
            input-channel="outputFromKafkaIngestion"
            output-channel="executionFilterChannel" method="getKafkaMessage">
            <bean class="com.xyz.report.service.KafkaListener" />
        </int:service-activator>

KafkaChannelInterceptor 是通道 "executionFilterChannel" 的通道拦截器。它有以下方法:-

@Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        System.out.println("Before Sending =======================> " + channel  + "message : "+ message);
        return null;
    }

完整日志:-

Message received from Kafka  ================================> GenericMessage [payload=EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed], headers={kafka_offset=73, PRODUCT=Equity, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=datapipeline.kafka.ingestion.test.1, kafka_receivedTimestamp=1587920627054, EVENT=CREATE, REGION=APAC}]
Before Sending =======================> executionFilterChannelmessage : GenericMessage [payload=EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed], headers={kafka_offset=73, PRODUCT=Equity, kafka_timestampType=CREATE_TIME, id=dad27393-5ef6-9280-8d3e-60c458312d65, kafka_receivedPartitionId=0, kafka_receivedTopic=datapipeline.kafka.ingestion.test.1, kafka_receivedTimestamp=1587920627054, EVENT=CREATE, REGION=APAC, timestamp=1587920627094}]
2020-04-26 22:33:47.096 ERROR 22640 --- [aContainer1-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = datapipeline.kafka.ingestion.test.1, partition = 0, offset = 73, CreateTime = 1587920627054, serialized key size = -1, serialized value size = 335, headers = RecordHeaders(headers = [RecordHeader(key = PRODUCT, value = [34, 69, 113, 117, 105, 116, 121, 34]), RecordHeader(key = EVENT, value = [34, 67, 82, 69, 65, 84, 69, 34]), RecordHeader(key = REGION, value = [34, 65, 80, 65, 67, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 80, 82, 79, 68, 85, 67, 84, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 69, 86, 69, 78, 84, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 82, 69, 71, 73, 79, 78, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed])

org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'executionFilterChannel' within timeout: -1
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:118) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:925) [spring-kafka-1.3.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:909) [spring-kafka-1.3.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:860) [spring-kafka-1.3.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.3.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:622) [spring-kafka-1.3.2.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_211]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_211]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_211]

请帮助我,因为我已经卡住了 3 天了。

你在拦截器中的代码是这样的:

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
    System.out.println("Before Sending =======================> " + channel  + "message : "+ message);
    return null;
}

你returnnull。因此实际上没有任何东西发送到通道。这就是你得到那个例外的原因。

查看其 JavaDocs:

/**
 * Invoked before the Message is actually sent to the channel.
 * This allows for modification of the Message if necessary.
 * If this method returns {@code null} then the actual
 * send invocation will not occur.
 */
@Nullable
default Message<?> preSend(Message<?> message, MessageChannel channel) {

为方便起见,有一条关于此事的调试信息:

message = interceptor.preSend(message, channel);
if (message == null) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(interceptor.getClass().getSimpleName()
                                + " returned null from preSend, i.e. precluding the send.");
        }
        afterSendCompletion(previous, channel, false, null, interceptorStack);
        return null;
}