Quarkus:java.lang.IllegalStateException:找不到该频道的订阅者
Quarkus: java.lang.IllegalStateException: No subscriber found for the channel
我正在使用 Quarkus,我有一个缓慢的 运行 命令式进程:
@ApplicationScoped
public class SlowApplication {
private static final Logger LOGGER = Logger.getLogger(SlowApplication.class.getName());
@Inject @Channel("texto") public Emitter<String> emitter;
public Integer slowProcess(int num) {
for (int n=0;n<num;n++) {
LOGGER.info("Start Num: "+n);
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.send("Number is: "+n);
LOGGER.info("End Num: "+n);
}
return num;
}
我想使用 REST 从流程中获取更新 API:
@Path("/test")
@ApplicationScoped
public class MainWeb {
@Inject
SlowApplication app;
@Inject
Bridge bridge;
@Inject @Channel("texto-sent") public Multi<String> textos;
@GET
@Produces(MediaType.TEXT_HTML)
public String hello() {
return "hello";
}
@GET
@Produces(MediaType.TEXT_HTML)
@Path("/custom")
public Uni<String> custom() {
return Uni.createFrom().item(app.slowProcess(4)).onItem().transform(n -> String.format("Finished %s", n));
}
@GET
@Produces(MediaType.TEXT_HTML)
@Path("/stream")
public Multi<String> stream() {
return textos;
}
}
@ApplicationScoped
public class Bridge {
private static final Logger LOGGER = Logger.getLogger(Bridge.class.getName());
@Outgoing("texto-sent")
@Incoming("texto")
public String receive(String str) {
LOGGER.infof("Received " + str);
return "Received " + str;
}
到达时出现以下错误:
emitter.send("Number is: "+n);
java.lang.IllegalStateException: SRMSG00027: 找不到频道 texto
的订阅者
Link到Github code(包括pom和依赖版本)
知道为什么 @Incoming("texto")
无法识别吗?
完整错误:
java.lang.IllegalStateException: SRMSG00027: No subscriber found for the channel texto
at io.smallrye.reactive.messaging.extension.AbstractEmitter.verify(AbstractEmitter.java:157)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:139)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
at org.jds.sandbox.createUni.SlowApplication.slowProcess(SlowApplication.java:25)
at org.jds.sandbox.createUni.SlowApplication_Subclass.slowProcess$$superaccessor1(SlowApplication_Subclass.zig:204)
at org.jds.sandbox.createUni.SlowApplication_Subclass$$function$.apply(SlowApplication_Subclass$$function$.zig:35)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:63)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at org.jds.sandbox.createUni.SlowApplication_Subclass.slowProcess(SlowApplication_Subclass.zig:161)
at org.jds.sandbox.createUni.SlowApplication_ClientProxy.slowProcess(SlowApplication_ClientProxy.zig:128)
at org.jds.sandbox.createUni.MainWeb.custom(MainWeb.java:37)
at org.jds.sandbox.createUni.MainWeb_Subclass.custom$$superaccessor1(MainWeb_Subclass.zig:237)
at org.jds.sandbox.createUni.MainWeb_Subclass$$function$.apply(MainWeb_Subclass$$function$.zig:29)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:63)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at org.jds.sandbox.createUni.MainWeb_Subclass.custom(MainWeb_Subclass.zig:195)
at org.jds.sandbox.createUni.MainWeb_ClientProxy.custom(MainWeb_ClientProxy.zig:213)
at org.jds.sandbox.createUni.MainWeb$quarkusrestinvoker$custom_0989e6157a2c7c5acc3973ff53d46eb621670726.invoke(MainWeb$quarkusrestinvoker$custom_0989e6157a2c7c5acc3973ff53d46eb621670726.zig:33)
at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:7)
at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:122)
at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:47)
at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:17)
at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:7)
at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:137)
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
at io.quarkus.vertx.http.runtime.StaticResourcesRecorder.lambda$start(StaticResourcesRecorder.java:65)
at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:101)
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
at io.vertx.ext.web.handler.impl.StaticHandlerImpl.lambda$sendStatic(StaticHandlerImpl.java:206)
at io.vertx.core.impl.ContextImpl.lambda$null[=15=](ContextImpl.java:327)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync[=15=](EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:835)
解决问题的方法是:
- 使用
MutinyEmitter
代替Emitter
- 使用
emitter.sendAndForget
代替emitter.send
SlowApplication 的新代码:
@ApplicationScoped
public class SlowApplication {
private static final Logger LOGGER = Logger.getLogger(SlowApplication.class.getName());
@Inject @Channel("texto") public MutinyEmitter<String> emitter;
public Integer slowProcess(int num) {
for (int n=0;n<num;n++) {
LOGGER.info("Start Num: "+n);
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.sendAndForget("Number is: "+n);
LOGGER.info("End Num: "+n);
}
return num;
}
我还将 MainWeb 方法中的流更改为:
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public Multi<String> stream() {
return textos;
}
注意:我根据 Clement 的建议更新到版本 1.13。7.Final。
我正在使用 Quarkus,我有一个缓慢的 运行 命令式进程:
@ApplicationScoped
public class SlowApplication {
private static final Logger LOGGER = Logger.getLogger(SlowApplication.class.getName());
@Inject @Channel("texto") public Emitter<String> emitter;
public Integer slowProcess(int num) {
for (int n=0;n<num;n++) {
LOGGER.info("Start Num: "+n);
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.send("Number is: "+n);
LOGGER.info("End Num: "+n);
}
return num;
}
我想使用 REST 从流程中获取更新 API:
@Path("/test")
@ApplicationScoped
public class MainWeb {
@Inject
SlowApplication app;
@Inject
Bridge bridge;
@Inject @Channel("texto-sent") public Multi<String> textos;
@GET
@Produces(MediaType.TEXT_HTML)
public String hello() {
return "hello";
}
@GET
@Produces(MediaType.TEXT_HTML)
@Path("/custom")
public Uni<String> custom() {
return Uni.createFrom().item(app.slowProcess(4)).onItem().transform(n -> String.format("Finished %s", n));
}
@GET
@Produces(MediaType.TEXT_HTML)
@Path("/stream")
public Multi<String> stream() {
return textos;
}
}
@ApplicationScoped
public class Bridge {
private static final Logger LOGGER = Logger.getLogger(Bridge.class.getName());
@Outgoing("texto-sent")
@Incoming("texto")
public String receive(String str) {
LOGGER.infof("Received " + str);
return "Received " + str;
}
到达时出现以下错误:
emitter.send("Number is: "+n);
java.lang.IllegalStateException: SRMSG00027: 找不到频道 texto
的订阅者Link到Github code(包括pom和依赖版本)
知道为什么 @Incoming("texto")
无法识别吗?
完整错误:
java.lang.IllegalStateException: SRMSG00027: No subscriber found for the channel texto
at io.smallrye.reactive.messaging.extension.AbstractEmitter.verify(AbstractEmitter.java:157)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:139)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
at org.jds.sandbox.createUni.SlowApplication.slowProcess(SlowApplication.java:25)
at org.jds.sandbox.createUni.SlowApplication_Subclass.slowProcess$$superaccessor1(SlowApplication_Subclass.zig:204)
at org.jds.sandbox.createUni.SlowApplication_Subclass$$function$.apply(SlowApplication_Subclass$$function$.zig:35)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:63)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at org.jds.sandbox.createUni.SlowApplication_Subclass.slowProcess(SlowApplication_Subclass.zig:161)
at org.jds.sandbox.createUni.SlowApplication_ClientProxy.slowProcess(SlowApplication_ClientProxy.zig:128)
at org.jds.sandbox.createUni.MainWeb.custom(MainWeb.java:37)
at org.jds.sandbox.createUni.MainWeb_Subclass.custom$$superaccessor1(MainWeb_Subclass.zig:237)
at org.jds.sandbox.createUni.MainWeb_Subclass$$function$.apply(MainWeb_Subclass$$function$.zig:29)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:63)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at org.jds.sandbox.createUni.MainWeb_Subclass.custom(MainWeb_Subclass.zig:195)
at org.jds.sandbox.createUni.MainWeb_ClientProxy.custom(MainWeb_ClientProxy.zig:213)
at org.jds.sandbox.createUni.MainWeb$quarkusrestinvoker$custom_0989e6157a2c7c5acc3973ff53d46eb621670726.invoke(MainWeb$quarkusrestinvoker$custom_0989e6157a2c7c5acc3973ff53d46eb621670726.zig:33)
at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:7)
at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:122)
at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:47)
at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:17)
at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:7)
at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:137)
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
at io.quarkus.vertx.http.runtime.StaticResourcesRecorder.lambda$start(StaticResourcesRecorder.java:65)
at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:101)
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
at io.vertx.ext.web.handler.impl.StaticHandlerImpl.lambda$sendStatic(StaticHandlerImpl.java:206)
at io.vertx.core.impl.ContextImpl.lambda$null[=15=](ContextImpl.java:327)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync[=15=](EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:835)
解决问题的方法是:
- 使用
MutinyEmitter
代替Emitter
- 使用
emitter.sendAndForget
代替emitter.send
SlowApplication 的新代码:
@ApplicationScoped
public class SlowApplication {
private static final Logger LOGGER = Logger.getLogger(SlowApplication.class.getName());
@Inject @Channel("texto") public MutinyEmitter<String> emitter;
public Integer slowProcess(int num) {
for (int n=0;n<num;n++) {
LOGGER.info("Start Num: "+n);
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.sendAndForget("Number is: "+n);
LOGGER.info("End Num: "+n);
}
return num;
}
我还将 MainWeb 方法中的流更改为:
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public Multi<String> stream() {
return textos;
}
注意:我根据 Clement 的建议更新到版本 1.13。7.Final。