MessageDispatchingException:调度程序没有订阅者

MessageDispatchingException: Dispatcher has no subscribers

进行简单的 Spring Cloud Stream 设置。

界面

public interface MyKafkaBinding {

    @Output(PUBLISHER)
    MessageChannel publisher();

    @Input("subscriber")
    SubscribableChannel subscriber();
}

绑定

@EnableBinding(MyKafkaBinding.class)

听众

@StreamListener(MyKafkaBinding.PUBLISHER)
public void listen(MyEvent message) {
    // handle
}

应用程序属性

spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json

spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json

一切正常。收到使用发布者发送的消息。
现在,我正在尝试使用 KafkaTemplate 从另一个应用向该主题发送消息:

kafkaTemplate.send(topic, message)

这次接收端报错:

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
   at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
   at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
   at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
   at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:74)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
   at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=15=](RetryingMessageListenerAdapter.java:120)
   at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
   at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
   at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
   ... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
   at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
   at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
   at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
   ... 27 common frames omitted

Spring 版本 5+。
这是否是一个有效的场景,使用 KafkaTemplate 发送消息并期望它们被云流订阅者接收?

您的 @StreamListener 绑定到发布者频道而不是订阅者频道。

这是一个工作示例:

@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {

    public static void main(String[] args) {
        SpringApplication.run(So59585815Application.class, args);
    }

    @Autowired
    private MessageChannel publisher;

    @StreamListener("subscriber")
    public void listen(String in) {
        publisher.send(new GenericMessage<>(in.toUpperCase()));
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("subscriber-topic", "foo".getBytes());
        };
    }

    @KafkaListener(id = "listener", topics = "publisher-topic")
    public void listen(byte[] in) {
        System.out.println(new String(in));
    }

}

interface MyKafkaBinding {

    @Output("publisher")
    MessageChannel publisher();

    @Input("subscriber")
    SubscribableChannel subscriber();

}

spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup

spring.kafka.consumer.auto-offset-reset=earliest

我的项目出现这个问题的原因:

我的项目无法连接到 Kafka