Spring 集成 DSL KafkaProducerContext 配置
Spring Integration DSL KafkaProducerContext configuration
我正在尝试改编以下示例:
https://github.com/joshlong/spring-and-kafka
使用以下库的最新稳定版本:
org.apache.kafka > kafka_2.10 > 0.8.2.2
org.springframework.integration > spring-integration-kafka > 1.2.1.RELEASE
org.springframework.integration > spring-integration-java-dsl > 1.1.0.RELEASE
集成 dsl 库似乎经历了重构,可能是由于引入了新的 KafkaProducer。
这是我的生产者配置代码:
@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
log.info("starting producer flow..");
return flowDefinition -> {
ProducerMetadata<String, String> getProducerMetadata = new ProducerMetadata<>(this.kafkaConfig.getTopic(),
String.class, String.class, new StringSerializer(), new StringSerializer());
KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
props.put("timeout.ms", "35000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
.get();
flowDefinition
.handle(kafkaProducerMessageHandler);
};
}
消息生成代码:
@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
return args -> {
for (int i = 0; i < 1000; i++) {
in.send(MessageBuilder.withPayload("#" + i).setHeader(KafkaHeaders.TOPIC, this.kafkaConfig.getTopic()).build());
log.info("sending message #" + i);
}
};
}
那是我得到的异常:
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at jc.DemoApplication$ProducerConfiguration.lambda$kickOff[=14=](DemoApplication.java:104)
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:673)
... 10 more
Caused by: java.lang.NullPointerException
at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:67)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:201)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 18 more
更新:
完整的工作源代码可以在我的 fork 中找到:
https://github.com/magiccrafter/spring-and-kafka
抱歉耽搁了。
您的问题与早期 IntegrationComponentSpec
实例化有关:
KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
props.put("timeout.ms", "35000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
.get();
你不应该给自己打电话 .get()
。
KafkaProducerMessageHandlerSpec
是 ComponentsRegistration
并且只有 SI Java DSL 才能正确解析它。那里的代码看起来像:
public Collection<Object> getComponentsToRegister() {
this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
return Collections.<Object>singleton(this.kafkaProducerContext);
}
由于未调用此代码,因此 this.producerConfigurations
未填充到 this.kafkaProducerContext
。虽然最后一个无论如何都要注册成bean
因此,要解决您的问题,您应该只处理 DSL 定义中的 IntegrationComponentSpec
。
只需获得 KafkaProducerMessageHandlerSpec
并将其用于下面的 .handle()
。如果我们可以直接从 .handle()
.
使用 Kafka.outboundChannelAdapter()
,不确定是否有理由提取此对象
我正在尝试改编以下示例: https://github.com/joshlong/spring-and-kafka
使用以下库的最新稳定版本:
org.apache.kafka > kafka_2.10 > 0.8.2.2
org.springframework.integration > spring-integration-kafka > 1.2.1.RELEASE
org.springframework.integration > spring-integration-java-dsl > 1.1.0.RELEASE
集成 dsl 库似乎经历了重构,可能是由于引入了新的 KafkaProducer。
这是我的生产者配置代码:
@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
log.info("starting producer flow..");
return flowDefinition -> {
ProducerMetadata<String, String> getProducerMetadata = new ProducerMetadata<>(this.kafkaConfig.getTopic(),
String.class, String.class, new StringSerializer(), new StringSerializer());
KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
props.put("timeout.ms", "35000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
.get();
flowDefinition
.handle(kafkaProducerMessageHandler);
};
}
消息生成代码:
@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
return args -> {
for (int i = 0; i < 1000; i++) {
in.send(MessageBuilder.withPayload("#" + i).setHeader(KafkaHeaders.TOPIC, this.kafkaConfig.getTopic()).build());
log.info("sending message #" + i);
}
};
}
那是我得到的异常:
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at jc.DemoApplication$ProducerConfiguration.lambda$kickOff[=14=](DemoApplication.java:104)
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:673)
... 10 more
Caused by: java.lang.NullPointerException
at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:67)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:201)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 18 more
更新:
完整的工作源代码可以在我的 fork 中找到:
https://github.com/magiccrafter/spring-and-kafka
抱歉耽搁了。
您的问题与早期 IntegrationComponentSpec
实例化有关:
KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
props.put("timeout.ms", "35000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
.get();
你不应该给自己打电话 .get()
。
KafkaProducerMessageHandlerSpec
是 ComponentsRegistration
并且只有 SI Java DSL 才能正确解析它。那里的代码看起来像:
public Collection<Object> getComponentsToRegister() {
this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
return Collections.<Object>singleton(this.kafkaProducerContext);
}
由于未调用此代码,因此 this.producerConfigurations
未填充到 this.kafkaProducerContext
。虽然最后一个无论如何都要注册成bean
因此,要解决您的问题,您应该只处理 DSL 定义中的 IntegrationComponentSpec
。
只需获得 KafkaProducerMessageHandlerSpec
并将其用于下面的 .handle()
。如果我们可以直接从 .handle()
.
Kafka.outboundChannelAdapter()
,不确定是否有理由提取此对象