Mono<> 与 Spring Cloud Stream

Mono<> with Spring Cloud Stream

我 运行 在尝试将 Reactor 中的 Mono 与 Spring Cloud Stream 结合使用时遇到问题,无法真正弄清楚发生了什么。

假设我有一个这样的听众:

@StreamListener
@Output(Urls.OUTUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Flux<String> urlFormats)
{
    return urlFormats
        .map(this::expandUrl)
        .flatMapIterable(urls -> urls);
}

所以它基本上是将像这样 http://www.example.com/page/%d 格式的 url 扩展成像这样

http://www.example.com/page/1

http://www.example.com/page/2

http://www.example.com/page/3

它按预期工作,但是当我尝试这样做时:

@StreamListener
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(@Input(Urls.INPUT) Mono<String> urlFormats)
{
    return urlFormats
        .repeat(3)
        .zipWith(pageNumbers)
        .map(this::formatUrl);
}

其中 pageNumber 是 Flux.fromStream(Stream.iterate(1, p -> p+1).limit(3))

我得到以下异常

Caused by: java.lang.IllegalArgumentException: A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.

我通过这样做摆脱了异常

@StreamListener(value = Urls.INPUT)
@Output(Urls.OUTPUT)
public Flux<String> expandUrls(Mono<String> urlFormats)
{
    return urlFormats
        .repeat(3)
        .zipWith(pageNumbers)
        .map(this::formatUrl);
}

但现在我明白了:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'http': was expecting ('true', 'false' or 'null')
 at [Source: http://www.example.com/page-%d,1,0.html;

我的问题是:如何将 Mono 与 Spring Cloud Stream 一起使用。甚至可以像这样使用它吗?如果是,那么该怎么做? 哦,我正在使用 Kafka 作为 kafka-starter 的代理。

Spring Cloud Stream StreamListener 的 @Input 参数类型支持反应器类型 Flux 只是因为它比在 @Input 参数类型上具有 Mono 更适合反应式流应用程序。