IntegrationFlow for Kafka Message error while configureListenerContainer
IntegrationFlow for Kafka Message error while configureListenerContainer
我正在尝试使用 IntegrationFlow for kafka 将从 Kafka 收到的消息传递到通道。
下面是我的工作代码:-
@Bean
public MessageChannel fromKafka() {
return new DirectChannel();
}
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow1() throws Exception {
/* return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, kafkaTopic)
.configureListenerContainer( c -> c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(messageFromKafka(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();*/
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(listener(), KafkaMessageDrivenChannelAdapter.ListenerMode.record))
.channel("fromKafka")
.get();
}
@Bean("listenerkafka")
public KafkaMessageListenerContainer<String, String> listener() throws Exception {
ContainerProperties properties = new ContainerProperties(kafkaTopic1);
properties.setGroupId("kafka-test");
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@ServiceActivator(inputChannel="fromKafka", outputChannel = "somechannel")
public Message<CreatRequest> fromKafka(Message<?> msg) throws JsonProcessingException {
CreatRequest creatRequest = objectMapper.readValue(msg.getPayload().toString(), CreatRequest.class);
Message<CreatRequest> message= MessageBuilder.withPayload(creatRequest).build();
logger.info("Inside fromKafka " + message);
return message;
}
我面临的问题是注释代码在 topic1ListenerFromKafkaFlow1 中不起作用。
在这里我找不到 c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
因为它显示无法识别编译时错误ackmode。
你能纠正我哪里错了吗
此外,我需要在另一个线程而不是主线程中传递此流程。
改为使用 Kafka message-driven 通道适配器:
https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound
但是,如果两个适配器在同一个通道上,请求将 round-robin 在它们之间分配。如果你想让两者都收到消息,你需要一个 PublishSubscribeChannel
.
我正在尝试使用 IntegrationFlow for kafka 将从 Kafka 收到的消息传递到通道。
下面是我的工作代码:-
@Bean
public MessageChannel fromKafka() {
return new DirectChannel();
}
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow1() throws Exception {
/* return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, kafkaTopic)
.configureListenerContainer( c -> c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(messageFromKafka(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();*/
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(listener(), KafkaMessageDrivenChannelAdapter.ListenerMode.record))
.channel("fromKafka")
.get();
}
@Bean("listenerkafka")
public KafkaMessageListenerContainer<String, String> listener() throws Exception {
ContainerProperties properties = new ContainerProperties(kafkaTopic1);
properties.setGroupId("kafka-test");
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@ServiceActivator(inputChannel="fromKafka", outputChannel = "somechannel")
public Message<CreatRequest> fromKafka(Message<?> msg) throws JsonProcessingException {
CreatRequest creatRequest = objectMapper.readValue(msg.getPayload().toString(), CreatRequest.class);
Message<CreatRequest> message= MessageBuilder.withPayload(creatRequest).build();
logger.info("Inside fromKafka " + message);
return message;
}
我面临的问题是注释代码在 topic1ListenerFromKafkaFlow1 中不起作用。 在这里我找不到 c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
因为它显示无法识别编译时错误ackmode。 你能纠正我哪里错了吗
此外,我需要在另一个线程而不是主线程中传递此流程。
改为使用 Kafka message-driven 通道适配器:
https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound
但是,如果两个适配器在同一个通道上,请求将 round-robin 在它们之间分配。如果你想让两者都收到消息,你需要一个 PublishSubscribeChannel
.