异常:org.springframework.messaging.MessageDeliveryException:Dispatcher 没有频道订阅者
Exception: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel
我有一个沙箱用于探索 Spring Cloud Stream 中新添加的功能,但我在一个 Spring Cloud Stream 应用程序中使用 Function 和 Supplier 时遇到了问题。
在代码中,我使用了 docs 中描述的示例。
首先,我在 application.yml
中添加了对应的 spring.cloud.stream.bindings
和 spring.cloud.stream.function.definition
属性的项目 Function<String, String>
。一切正常,我 post 向 my-fun-in
Kafka 主题发送消息,应用程序执行功能并将结果发送到 my-fun-out
主题。
然后我将 Supplier<Flux<String>>
添加到具有相应 spring.cloud.stream.bindings
的同一项目,并将 spring.cloud.stream.function.definition
值更新为 fun;sup
。奇怪的事情开始发生了。当我尝试启动应用程序时,我收到以下错误:
2020-01-15 01:45:16.608 ERROR 10128 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.sup-out-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}], failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:188)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo(FluxMessageChannel.java:83)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 34 more
之后我尝试了几件事:
- 已将
spring.cloud.stream.function.definition
还原为 fun
(禁用 sup
bean 绑定到外部目标)。应用程序启动,功能正常,供应商不工作。一切如预期。
- 将
spring.cloud.stream.function.definition
更改为 sup
(禁用 fun
bean 绑定到外部目标)。应用程序启动,功能不起作用,供应商工作(每秒向 my-sup-out
主题生成消息)。一切如预期。
- 已将
spring.cloud.stream.function.definition
值更新为 fun;sup
。应用程序未启动,得到相同的 MessageDeliveryException。
- 已将
spring.cloud.stream.function.definition
值替换为 sup;fun
。应用程序已启动,供应商工作,但功能不工作(未向my-fun-out
主题发送消息)。
最后一个比错误更让我困惑)所以现在我需要有人的帮助来解决问题。
我是不是遗漏了什么配置?为什么在 spring.cloud.stream.function.definition
中更改由 ;
分隔的 bean 顺序会导致不同的结果?
完整的项目已上传至 GitHub 并在下方添加:
StreamApplication.java:
package com.kaine;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import java.util.function.Function;
import java.util.function.Supplier;
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class);
}
@Bean
public Function<String, String> fun() {
return value -> value.toUpperCase();
}
@Bean
public Supplier<Flux<String>> sup() {
return () -> Flux.from(emitter -> {
while (true) {
try {
emitter.onNext("Hello from Supplier!");
Thread.sleep(1000);
} catch (Exception e) {
// ignore
}
}
});
}
}
application.yml
spring:
cloud:
stream:
function:
definition: fun;sup
bindings:
fun-in-0:
destination: my-fun-in
fun-out-0:
destination: my-fun-out
sup-out-0:
destination: my-sup-out
build.gradle.kts:
plugins {
java
}
group = "com.kaine"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR1"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation(platform("org.springframework.boot:spring-boot-dependencies:2.2.2.RELEASE"))
}
configure<JavaPluginConvention> {
sourceCompatibility = JavaVersion.VERSION_11
}
实际上这是我们文档的一个问题,因为我认为我们为他的案例提供了被动供应商的一个坏例子。问题是您的 Supplier 处于无限阻塞循环中。基本上从来没有returns。
所以请将其更改为:
@Bean
public Supplier<Flux<String>> sup() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
我有一个沙箱用于探索 Spring Cloud Stream 中新添加的功能,但我在一个 Spring Cloud Stream 应用程序中使用 Function 和 Supplier 时遇到了问题。
在代码中,我使用了 docs 中描述的示例。
首先,我在 application.yml
中添加了对应的 spring.cloud.stream.bindings
和 spring.cloud.stream.function.definition
属性的项目 Function<String, String>
。一切正常,我 post 向 my-fun-in
Kafka 主题发送消息,应用程序执行功能并将结果发送到 my-fun-out
主题。
然后我将 Supplier<Flux<String>>
添加到具有相应 spring.cloud.stream.bindings
的同一项目,并将 spring.cloud.stream.function.definition
值更新为 fun;sup
。奇怪的事情开始发生了。当我尝试启动应用程序时,我收到以下错误:
2020-01-15 01:45:16.608 ERROR 10128 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.sup-out-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}], failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:188)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo(FluxMessageChannel.java:83)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 34 more
之后我尝试了几件事:
- 已将
spring.cloud.stream.function.definition
还原为fun
(禁用sup
bean 绑定到外部目标)。应用程序启动,功能正常,供应商不工作。一切如预期。 - 将
spring.cloud.stream.function.definition
更改为sup
(禁用fun
bean 绑定到外部目标)。应用程序启动,功能不起作用,供应商工作(每秒向my-sup-out
主题生成消息)。一切如预期。 - 已将
spring.cloud.stream.function.definition
值更新为fun;sup
。应用程序未启动,得到相同的 MessageDeliveryException。 - 已将
spring.cloud.stream.function.definition
值替换为sup;fun
。应用程序已启动,供应商工作,但功能不工作(未向my-fun-out
主题发送消息)。
最后一个比错误更让我困惑)所以现在我需要有人的帮助来解决问题。
我是不是遗漏了什么配置?为什么在 spring.cloud.stream.function.definition
中更改由 ;
分隔的 bean 顺序会导致不同的结果?
完整的项目已上传至 GitHub 并在下方添加:
StreamApplication.java:
package com.kaine;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import java.util.function.Function;
import java.util.function.Supplier;
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class);
}
@Bean
public Function<String, String> fun() {
return value -> value.toUpperCase();
}
@Bean
public Supplier<Flux<String>> sup() {
return () -> Flux.from(emitter -> {
while (true) {
try {
emitter.onNext("Hello from Supplier!");
Thread.sleep(1000);
} catch (Exception e) {
// ignore
}
}
});
}
}
application.yml
spring:
cloud:
stream:
function:
definition: fun;sup
bindings:
fun-in-0:
destination: my-fun-in
fun-out-0:
destination: my-fun-out
sup-out-0:
destination: my-sup-out
build.gradle.kts:
plugins {
java
}
group = "com.kaine"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR1"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation(platform("org.springframework.boot:spring-boot-dependencies:2.2.2.RELEASE"))
}
configure<JavaPluginConvention> {
sourceCompatibility = JavaVersion.VERSION_11
}
实际上这是我们文档的一个问题,因为我认为我们为他的案例提供了被动供应商的一个坏例子。问题是您的 Supplier 处于无限阻塞循环中。基本上从来没有returns。 所以请将其更改为:
@Bean
public Supplier<Flux<String>> sup() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}