EvaluationContext Null With Spring Integration 和 Kafka
EvaluationContext Null With Spring Integration and Kafka
我试图在 Spring 集成中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到 Kafka 队列中。为此,我使用 spring-integration-kafka。问题是我收到无法破译的 EvaluationContext
错误。
这是我在 XML 中的配置:
<int:channel id="myStreamChannel"/>
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" >
<int:method name="process" request-channel="myStreamChannel"/>
</int:gateway>
<int:channel id="activityOutputChannel"/>
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="false"
channel="activityOutputChannel"
topic="my-test"
message-key-expression="header.messageKey">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="kafkaserver.com:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="my-test"
key-encoder="stringEncoder"
value-encoder="stringEncoder"
compression-codec="snappy"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
当我 运行 我的应用程序通过 Spring 启动时,我得到这个异常:
创建名称为 'org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0' 的 bean 时出错:调用 init 方法失败;嵌套异常是 java.lang.IllegalArgumentException: [Assertion failed] - 这个参数是必需的;它不能为空
这是堆栈跟踪中的违规行:
在 org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:68)
这是第 68 行发生的事情:
Assert.notNull(this.evaluationContext);
因此 EvaluationContext
为空。我不知道为什么。
顺便说一句,当我用一个普通的 stdout
端点替换 Kafka 端点来打印消息正文时,一切正常。
你能告诉我我的 Kafka 端点配置有什么问题没有 EvaluationContext
可用吗?
您的问题属于版本不匹配地狱。
Spring Boot 拉取 Spring Integration Core 版本,该版本不支持 IntegrationEvaluationContextAware
填充 KafkaProducerMessageHandler
.
中的 EvaluationContext
因此,您应该将 Spring 集成 Kafka 升级到最新版本:https://github.com/spring-projects/spring-integration-kafka/releases
我试图在 Spring 集成中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到 Kafka 队列中。为此,我使用 spring-integration-kafka。问题是我收到无法破译的 EvaluationContext
错误。
这是我在 XML 中的配置:
<int:channel id="myStreamChannel"/>
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" >
<int:method name="process" request-channel="myStreamChannel"/>
</int:gateway>
<int:channel id="activityOutputChannel"/>
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="false"
channel="activityOutputChannel"
topic="my-test"
message-key-expression="header.messageKey">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="kafkaserver.com:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="my-test"
key-encoder="stringEncoder"
value-encoder="stringEncoder"
compression-codec="snappy"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
当我 运行 我的应用程序通过 Spring 启动时,我得到这个异常:
创建名称为 'org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0' 的 bean 时出错:调用 init 方法失败;嵌套异常是 java.lang.IllegalArgumentException: [Assertion failed] - 这个参数是必需的;它不能为空
这是堆栈跟踪中的违规行:
在 org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:68)
这是第 68 行发生的事情:
Assert.notNull(this.evaluationContext);
因此 EvaluationContext
为空。我不知道为什么。
顺便说一句,当我用一个普通的 stdout
端点替换 Kafka 端点来打印消息正文时,一切正常。
你能告诉我我的 Kafka 端点配置有什么问题没有 EvaluationContext
可用吗?
您的问题属于版本不匹配地狱。
Spring Boot 拉取 Spring Integration Core 版本,该版本不支持 IntegrationEvaluationContextAware
填充 KafkaProducerMessageHandler
.
EvaluationContext
因此,您应该将 Spring 集成 Kafka 升级到最新版本:https://github.com/spring-projects/spring-integration-kafka/releases