Spring Cloud Stream - 第一条 Kafka 消息出错 "Dispatcher has no subscribers"

Spring Cloud Stream - First Kafka messages get error "Dispatcher has no subscribers"

我的应用程序成功发送了 Kafka 消息,但仅在 Kafka 初始化之后。在此之前,我收到错误 "Dispatcher has no subscribers"。我如何等待订阅者完成频道注册?

这是事件顺序的轨迹(时间在 second.ms 中):

我不确定如何处理这个问题。疯狂的猜测包括:

  1. 将发送代码放在@PostConstruct
  2. 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到发件人
  3. 将@AutoConfigureAfter(BindingServiceConfiguration.class) 添加到 SenderClass
  4. 将@AutoConfigureBefore(BindingServiceConfiguration.class) 添加到 Main
  5. 将@DependsOn({"EnableBindingClass"}) 放在任务上
  6. 将 @DependsOn({"ApplicationLifeCycle"}) 放在 SchedulerClass 上,其中 ApplicationLifeCycle 是一个 class,除了 使用返回 MAX_INT
  7. 的 getPhase 实现 SmartLifecycle
  8. 确保对整个包启用 ComponentScan(来自其他 SO 线程的建议)
  9. 以上的各种组合

创建了一个新的应用程序,尽可能简单:

public interface Source {
  @Output(channelName)
  MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
  @Autowired
  private Source source;

  public boolean send(SomeObject object) {
    return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
  }
@Service
public class Scheduler {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @PostConstruct
  public void initialize() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

编辑以添加解决方案

现在可以使用了!在我的启动发送消息的调度程序中,我从 @PostConstruct 中的启动切换到 SmartLifecycle::start().

@Service
public class Scheduler implements SmartLifecycle {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @Override
  public void start() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

@PostConstruct 发送消息太早;上下文仍在构建中。实施 SmartLifecycle,将 bean 置于高阶段 (Integer.MAX_VALUE) 并在 start() 中执行发送。

或者在 ApplicationRunner 中执行发送。

我在 Webflux + Spring Cloud Stream 功能风格中遇到了类似的问题。 Spring Cloud Function in 2022 是首选方式。

经过大量调试后,我的假设是 bean 的创建顺序不正确。在 kafka 消息处理开始之前,该 bean 可能未在 spring-cloud-stream 的调度程序中注册。类似于@gary提到的。

所以我在我的消费者 bean 之前添加了 @Order(1)。希望在 dispatcher-registrations 启动之前创建此 bean。

   ​@Bean
   ​@Order(1)
   ​public Function<Flux<Message<Pojo>>, Mono<Void>> pojoConsumer() {

这似乎暂时解决了我的问题。