反应式@StreamListener
Reactive @StreamListener
快点。
是否可以使用 Spring Cloud Stream 进行反应式 @StreamListener 输入?我的意思是这样的:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log();
}
在文档中我只看到了这样的例子:
@StreamListener(Sink.INPUT)
public void log(String message)
{
log.info(message);
}
或这样的处理器:
@StreamListener
@Output(Source.OUTPUT)
public Flux<String> log(@Input(Sink.INPUT) Flux<String> strings)
{
return strings.map(String::toUpperCase);
}
当我 运行 来自第一个片段的代码时,我得到一个 Dispatcher has no subscribers
异常。
前两个是等价的。如果 运行 遇到错误,请打开 GitHub 问题。
编辑:
这两个注解是等价的,但是如果对输入通量有订阅,监听器只会订阅输入。为了完全等效,反应式应该是:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log().subscribe();
}
或
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.subscribe(log::info);
}
只需在 flux 上调用 log()
即可创建另一个 Flux
而无需自己订阅。
快点。
是否可以使用 Spring Cloud Stream 进行反应式 @StreamListener 输入?我的意思是这样的:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log();
}
在文档中我只看到了这样的例子:
@StreamListener(Sink.INPUT)
public void log(String message)
{
log.info(message);
}
或这样的处理器:
@StreamListener
@Output(Source.OUTPUT)
public Flux<String> log(@Input(Sink.INPUT) Flux<String> strings)
{
return strings.map(String::toUpperCase);
}
当我 运行 来自第一个片段的代码时,我得到一个 Dispatcher has no subscribers
异常。
前两个是等价的。如果 运行 遇到错误,请打开 GitHub 问题。
编辑:
这两个注解是等价的,但是如果对输入通量有订阅,监听器只会订阅输入。为了完全等效,反应式应该是:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log().subscribe();
}
或
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.subscribe(log::info);
}
只需在 flux 上调用 log()
即可创建另一个 Flux
而无需自己订阅。