MessageDispatchingException:调度程序没有订阅者
MessageDispatchingException: Dispatcher has no subscribers
进行简单的 Spring Cloud Stream 设置。
界面
public interface MyKafkaBinding {
@Output(PUBLISHER)
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
绑定
@EnableBinding(MyKafkaBinding.class)
听众
@StreamListener(MyKafkaBinding.PUBLISHER)
public void listen(MyEvent message) {
// handle
}
应用程序属性
spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json
spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json
一切正常。收到使用发布者发送的消息。
现在,我正在尝试使用 KafkaTemplate 从另一个应用向该主题发送消息:
kafkaTemplate.send(topic, message)
这次接收端报错:
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=15=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 27 common frames omitted
Spring 版本 5+。
这是否是一个有效的场景,使用 KafkaTemplate 发送消息并期望它们被云流订阅者接收?
您的 @StreamListener
绑定到发布者频道而不是订阅者频道。
这是一个工作示例:
@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {
public static void main(String[] args) {
SpringApplication.run(So59585815Application.class, args);
}
@Autowired
private MessageChannel publisher;
@StreamListener("subscriber")
public void listen(String in) {
publisher.send(new GenericMessage<>(in.toUpperCase()));
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("subscriber-topic", "foo".getBytes());
};
}
@KafkaListener(id = "listener", topics = "publisher-topic")
public void listen(byte[] in) {
System.out.println(new String(in));
}
}
interface MyKafkaBinding {
@Output("publisher")
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
和
spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
我的项目出现这个问题的原因:
我的项目无法连接到 Kafka
进行简单的 Spring Cloud Stream 设置。
界面
public interface MyKafkaBinding {
@Output(PUBLISHER)
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
绑定
@EnableBinding(MyKafkaBinding.class)
听众
@StreamListener(MyKafkaBinding.PUBLISHER)
public void listen(MyEvent message) {
// handle
}
应用程序属性
spring.cloud.stream.bindings.publisher.destination=my-kafka-topic
spring.cloud.stream.bindings.publisher.producer.header-mode=headers
spring.cloud.stream.bindings.publisher.content-type=application/json
spring.cloud.stream.bindings.subscriber.destination=my-kafka-topic
spring.cloud.stream.bindings.subscriber.consumer.header-mode=headers
spring.cloud.stream.bindings.subscriber.content-type=application/json
一切正常。收到使用发布者发送的消息。
现在,我正在尝试使用 KafkaTemplate 从另一个应用向该主题发送消息:
kafkaTemplate.send(topic, message)
这次接收端报错:
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'MyApp.subscriber'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=com.bax.so.MyEvent@6da11fec, headers={b3=[B@304c5b9f, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@742c6888, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my-kafka-topic, kafka_receivedTimestamp=1578085559878, kafka_groupId=my-default-group-id}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=15=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
... 8 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 27 common frames omitted
Spring 版本 5+。
这是否是一个有效的场景,使用 KafkaTemplate 发送消息并期望它们被云流订阅者接收?
您的 @StreamListener
绑定到发布者频道而不是订阅者频道。
这是一个工作示例:
@SpringBootApplication
@EnableBinding(MyKafkaBinding.class)
public class So59585815Application {
public static void main(String[] args) {
SpringApplication.run(So59585815Application.class, args);
}
@Autowired
private MessageChannel publisher;
@StreamListener("subscriber")
public void listen(String in) {
publisher.send(new GenericMessage<>(in.toUpperCase()));
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("subscriber-topic", "foo".getBytes());
};
}
@KafkaListener(id = "listener", topics = "publisher-topic")
public void listen(byte[] in) {
System.out.println(new String(in));
}
}
interface MyKafkaBinding {
@Output("publisher")
MessageChannel publisher();
@Input("subscriber")
SubscribableChannel subscriber();
}
和
spring.cloud.stream.bindings.publisher.destination=publisher-topic
spring.cloud.stream.bindings.subscriber.destination=subscriber-topic
spring.cloud.stream.bindings.subscriber.group=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
我的项目出现这个问题的原因:
我的项目无法连接到 Kafka