Spring 在同一应用程序中声明生产者和消费者时,云流不向 Kafka 发送消息
Spring cloud stream not send message to Kafka when declare producer and consumer in the same application
目前我在同一个 Spring 引导应用程序中配置了生产者和消费者,但是非常奇怪的是 Spring 云流发送消息没有通过 Kafka(我是使用 kafka-console-consumer 监控消息),但消费者仍然收到消息(使用与生产者相同的线程)。
如果我在应用程序中删除 consumerHandler (@StreamListener),生产者会成功将消息发送到 Kafka。
这个有什么配置吗?我需要 Spring 云流默认向 Kafka 发送消息。
生产者和消费者配置:
@Component
public interface NotificationProcessor {
String EMAIL_NOTIFICATION = "email-notification";
@Input(EMAIL_NOTIFICATION)
SubscribableChannel receiveEmail();
@Output(EMAIL_NOTIFICATION)
MessageChannel sendEmail();
}
这是我的一些配置:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: ${KAFKA_BROKERS:localhost:9092}
auto-create-topics: true
configuration:
auto.offset.reset: latest
bindings:
email-notification:
group: ${EMAIL_GROUP:email-group-notification}
destination: ${EMAIL_TOPIC:email-notification}
contentType: application/json
producer:
partitionCount: 9
consumer:
partitioned: true
concurrency: 3
instance-count: 1
instance-index: 0
一个API触发发送消息:
@RestController
@RequestMapping("/api")
public class TestResource {
private final Logger log = LoggerFactory.getLogger(TestResource.class);
private final NotificationProcessor notificationProcessor;
public TestResource(NotificationProcessor notificationProcessor) {
this.notificationProcessor = notificationProcessor;
}
@ApiOperation(value = "Test api")
@GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Boolean> test2() {
EmailMessage test = EmailMessage.builder()
.to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
).type(EContentType.JSON)
.build();
log.info("send email message to kafka");
notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
和消费者处理程序:
@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {
private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);
public NotificationProducer(){}
@StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
public void receiveEmail(@Payload Message<EmailMessage> message) {
log.info("Receive email message from kafka");
EmailMessage emailMessage = message.getPayload();
}
}
从所提供的信息中不清楚您要将消息发送到哪里。 . .什么频道?默认情况下,频道是内部的和直接的,因此如果您发送到您订阅的同一频道,您将完全绕过消息代理(即 Kafka)。这可以解释这两种症状(没有代理和相同的线程)。
也就是说,基于注释的配置模型已被弃用。在过去的几年里,我们完全迁移到 functional programming model,它更简单,并且旨在帮助您不去考虑通道等内部实现,因为它们确实供内部使用(您的代码和代理适配器)。
还有一个新组件可以让您将消息发送到专门为您所拥有的场景设计的代理 - StreamBridge。
无论如何,看一看并考虑重构您的应用程序。至少确保您发送到绑定到代理目的地的通道并订阅绑定到同一目的地的另一个通道,从而确保往返代理的发生。
最后但同样重要的是,我仍然很困惑为什么你需要发送给经纪人然后在同一个应用程序中订阅它?为什么网络开销?
目前我在同一个 Spring 引导应用程序中配置了生产者和消费者,但是非常奇怪的是 Spring 云流发送消息没有通过 Kafka(我是使用 kafka-console-consumer 监控消息),但消费者仍然收到消息(使用与生产者相同的线程)。
如果我在应用程序中删除 consumerHandler (@StreamListener),生产者会成功将消息发送到 Kafka。
这个有什么配置吗?我需要 Spring 云流默认向 Kafka 发送消息。
生产者和消费者配置:
@Component
public interface NotificationProcessor {
String EMAIL_NOTIFICATION = "email-notification";
@Input(EMAIL_NOTIFICATION)
SubscribableChannel receiveEmail();
@Output(EMAIL_NOTIFICATION)
MessageChannel sendEmail();
}
这是我的一些配置:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: ${KAFKA_BROKERS:localhost:9092}
auto-create-topics: true
configuration:
auto.offset.reset: latest
bindings:
email-notification:
group: ${EMAIL_GROUP:email-group-notification}
destination: ${EMAIL_TOPIC:email-notification}
contentType: application/json
producer:
partitionCount: 9
consumer:
partitioned: true
concurrency: 3
instance-count: 1
instance-index: 0
一个API触发发送消息:
@RestController
@RequestMapping("/api")
public class TestResource {
private final Logger log = LoggerFactory.getLogger(TestResource.class);
private final NotificationProcessor notificationProcessor;
public TestResource(NotificationProcessor notificationProcessor) {
this.notificationProcessor = notificationProcessor;
}
@ApiOperation(value = "Test api")
@GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Boolean> test2() {
EmailMessage test = EmailMessage.builder()
.to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
).type(EContentType.JSON)
.build();
log.info("send email message to kafka");
notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
和消费者处理程序:
@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {
private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);
public NotificationProducer(){}
@StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
public void receiveEmail(@Payload Message<EmailMessage> message) {
log.info("Receive email message from kafka");
EmailMessage emailMessage = message.getPayload();
}
}
从所提供的信息中不清楚您要将消息发送到哪里。 . .什么频道?默认情况下,频道是内部的和直接的,因此如果您发送到您订阅的同一频道,您将完全绕过消息代理(即 Kafka)。这可以解释这两种症状(没有代理和相同的线程)。
也就是说,基于注释的配置模型已被弃用。在过去的几年里,我们完全迁移到 functional programming model,它更简单,并且旨在帮助您不去考虑通道等内部实现,因为它们确实供内部使用(您的代码和代理适配器)。
还有一个新组件可以让您将消息发送到专门为您所拥有的场景设计的代理 - StreamBridge。
无论如何,看一看并考虑重构您的应用程序。至少确保您发送到绑定到代理目的地的通道并订阅绑定到同一目的地的另一个通道,从而确保往返代理的发生。
最后但同样重要的是,我仍然很困惑为什么你需要发送给经纪人然后在同一个应用程序中订阅它?为什么网络开销?