Spring Cloud Stream - 第一条 Kafka 消息出错 "Dispatcher has no subscribers"
Spring Cloud Stream - First Kafka messages get error "Dispatcher has no subscribers"
我的应用程序成功发送了 Kafka 消息,但仅在 Kafka 初始化之后。在此之前,我收到错误 "Dispatcher has no subscribers"。我如何等待订阅者完成频道注册?
这是事件顺序的轨迹(时间在 second.ms 中):
- 17.165 创建了 SenderClass
- 17.816初始化class,@PostConstruct启动PollingTask
- 24.781 PollingTask 发送第一条 Kafka 消息
- 24.816 第一个错误:"Dispatcher has no subscribers"
- 25.778 正在注册 MessageChannel 我的频道
- 仍然看到调度程序错误
- 27.067 频道我的频道'有 1 个订阅者
- 此后没有更多错误,消息发送正常
我不确定如何处理这个问题。疯狂的猜测包括:
- 将发送代码放在@PostConstruct
- 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到发件人
- 将@AutoConfigureAfter(BindingServiceConfiguration.class) 添加到 SenderClass
- 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到 Main
- 将@DependsOn({"EnableBindingClass"}) 放在任务上
- 将 @DependsOn({"ApplicationLifeCycle"}) 放在 SchedulerClass 上,其中 ApplicationLifeCycle 是一个 class,除了
使用返回 MAX_INT
的 getPhase 实现 SmartLifecycle
- 确保对整个包启用 ComponentScan(来自其他 SO 线程的建议)
- 以上的各种组合
创建了一个新的应用程序,尽可能简单:
public interface Source {
@Output(channelName)
MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
@Autowired
private Source source;
public boolean send(SomeObject object) {
return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
}
@Service
public class Scheduler {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@PostConstruct
public void initialize() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
编辑以添加解决方案
现在可以使用了!在我的启动发送消息的调度程序中,我从 @PostConstruct 中的启动切换到 SmartLifecycle::start().
@Service
public class Scheduler implements SmartLifecycle {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Override
public void start() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
@PostConstruct 发送消息太早;上下文仍在构建中。实施 SmartLifecycle,将 bean 置于高阶段 (Integer.MAX_VALUE) 并在 start() 中执行发送。
或者在 ApplicationRunner 中执行发送。
我在 Webflux + Spring Cloud Stream 功能风格中遇到了类似的问题。 Spring Cloud Function in 2022 是首选方式。
经过大量调试后,我的假设是 bean 的创建顺序不正确。在 kafka 消息处理开始之前,该 bean 可能未在 spring-cloud-stream 的调度程序中注册。类似于@gary提到的。
所以我在我的消费者 bean 之前添加了 @Order(1)
。希望在 dispatcher-registrations 启动之前创建此 bean。
@Bean
@Order(1)
public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {
这似乎暂时解决了我的问题。
我的应用程序成功发送了 Kafka 消息,但仅在 Kafka 初始化之后。在此之前,我收到错误 "Dispatcher has no subscribers"。我如何等待订阅者完成频道注册?
这是事件顺序的轨迹(时间在 second.ms 中):
- 17.165 创建了 SenderClass
- 17.816初始化class,@PostConstruct启动PollingTask
- 24.781 PollingTask 发送第一条 Kafka 消息
- 24.816 第一个错误:"Dispatcher has no subscribers"
- 25.778 正在注册 MessageChannel 我的频道
- 仍然看到调度程序错误
- 27.067 频道我的频道'有 1 个订阅者
- 此后没有更多错误,消息发送正常
我不确定如何处理这个问题。疯狂的猜测包括:
- 将发送代码放在@PostConstruct
- 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到发件人
- 将@AutoConfigureAfter(BindingServiceConfiguration.class) 添加到 SenderClass
- 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到 Main
- 将@DependsOn({"EnableBindingClass"}) 放在任务上
- 将 @DependsOn({"ApplicationLifeCycle"}) 放在 SchedulerClass 上,其中 ApplicationLifeCycle 是一个 class,除了 使用返回 MAX_INT 的 getPhase 实现 SmartLifecycle
- 确保对整个包启用 ComponentScan(来自其他 SO 线程的建议)
- 以上的各种组合
创建了一个新的应用程序,尽可能简单:
public interface Source {
@Output(channelName)
MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
@Autowired
private Source source;
public boolean send(SomeObject object) {
return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
}
@Service
public class Scheduler {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@PostConstruct
public void initialize() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
编辑以添加解决方案
现在可以使用了!在我的启动发送消息的调度程序中,我从 @PostConstruct 中的启动切换到 SmartLifecycle::start().
@Service
public class Scheduler implements SmartLifecycle {
@Autowired
Sender sender;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Override
public void start() {
taskScheduler.schedule(new PollingTask(), nextTime);
}
private class PollingTask implements Runnable {
@Override
public void run() {
List<SomeObject> objects = getDummyData();
for(SomeObject object : objects)
{
sender.send(interval);
}
Instant nextTime = Instant.now().plusMillis(1_000L);
try {
taskScheduler.schedule(new PollingTask(), nextTime);
} catch (Exception e) {
logger.error(e);
}
}
}
@PostConstruct 发送消息太早;上下文仍在构建中。实施 SmartLifecycle,将 bean 置于高阶段 (Integer.MAX_VALUE) 并在 start() 中执行发送。
或者在 ApplicationRunner 中执行发送。
我在 Webflux + Spring Cloud Stream 功能风格中遇到了类似的问题。 Spring Cloud Function in 2022 是首选方式。
经过大量调试后,我的假设是 bean 的创建顺序不正确。在 kafka 消息处理开始之前,该 bean 可能未在 spring-cloud-stream 的调度程序中注册。类似于@gary提到的。
所以我在我的消费者 bean 之前添加了 @Order(1)
。希望在 dispatcher-registrations 启动之前创建此 bean。
@Bean
@Order(1)
public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {
这似乎暂时解决了我的问题。