来自无限 java 流的通量端点
Flux endpoint from infinite java stream
我在处理从 Stream.generate
构造构建的通量时遇到问题。
Java 流正在从远程源获取一些数据,因此我实现了一个嵌入了数据获取逻辑的自定义供应商,然后用它来填充流。
Stream.generate(new SearchSupplier(...))
我的想法是检测空列表并使用 takeWhile
->
的 Java9 功能
Stream.generate(new SearchSupplier(this, queryBody))
.takeWhile(either -> either.isRight() && either.get().nonEmpty())
(使用 Vavr 的 Either 结构)
repositoroy layer flux 将做:
return Flux.fromStream (
this.searchStream(...) //this is where the stream gets generated
)
.map(Either::get)
.flatMap(Flux::fromIterable);
"service" 层由一些对 flux 的转换步骤组成,但方法签名类似于 Flux<JsonObject> search(...)
。
最后controller层有个GetMapping:
@GetMapping(produces = "application/stream+json")
public Flux search(...) {
return searchService.search(...) //this is the Flux<JsonObject> parth
.subscriberContext(...) //stuff I need available during processing
.doOnComplete(() -> log.debug("DONE"));
}
我的问题是 Flux 似乎永远不会终止。
例如,从 Postman 拨打电话只是在响应部分拍摄了 'Loading...' 部分。当我从我的 IDE 终止进程时,结果会刷新给邮递员,我看到了我所期待的。 doOnComplete lambda 也永远不会被调用
我注意到,如果我更改 Flux 的来源:
Flux.fromArray(...) //harcoded array of lists of jsons
调用 doOnComplete lambda 并关闭 http 连接,结果显示在邮递员中。
知道可能是什么问题吗?
谢谢。
您可以使用如下所示的代码直接创建 Flux。请注意,我正在添加一些您需要根据 SearchSupplier 的工作方式实施的假定方法:
Flux<SearchResultType> flux = Flux.generate(
() -> new SearchSupplier(this, queryBody),
(supplier, sink) -> {
SearchResultType current = supplier.next();
if (isNotLast(current)) {
sink.next(current);
} else {
sink.complete();
}
return supplier;
},
supplier -> anyCleanupOperations(supplier)
);
我在处理从 Stream.generate
构造构建的通量时遇到问题。
Java 流正在从远程源获取一些数据,因此我实现了一个嵌入了数据获取逻辑的自定义供应商,然后用它来填充流。
Stream.generate(new SearchSupplier(...))
我的想法是检测空列表并使用 takeWhile
->
Stream.generate(new SearchSupplier(this, queryBody))
.takeWhile(either -> either.isRight() && either.get().nonEmpty())
(使用 Vavr 的 Either 结构)
repositoroy layer flux 将做:
return Flux.fromStream (
this.searchStream(...) //this is where the stream gets generated
)
.map(Either::get)
.flatMap(Flux::fromIterable);
"service" 层由一些对 flux 的转换步骤组成,但方法签名类似于 Flux<JsonObject> search(...)
。
最后controller层有个GetMapping:
@GetMapping(produces = "application/stream+json")
public Flux search(...) {
return searchService.search(...) //this is the Flux<JsonObject> parth
.subscriberContext(...) //stuff I need available during processing
.doOnComplete(() -> log.debug("DONE"));
}
我的问题是 Flux 似乎永远不会终止。 例如,从 Postman 拨打电话只是在响应部分拍摄了 'Loading...' 部分。当我从我的 IDE 终止进程时,结果会刷新给邮递员,我看到了我所期待的。 doOnComplete lambda 也永远不会被调用
我注意到,如果我更改 Flux 的来源:
Flux.fromArray(...) //harcoded array of lists of jsons
调用 doOnComplete lambda 并关闭 http 连接,结果显示在邮递员中。
知道可能是什么问题吗?
谢谢。
您可以使用如下所示的代码直接创建 Flux。请注意,我正在添加一些您需要根据 SearchSupplier 的工作方式实施的假定方法:
Flux<SearchResultType> flux = Flux.generate(
() -> new SearchSupplier(this, queryBody),
(supplier, sink) -> {
SearchResultType current = supplier.next();
if (isNotLast(current)) {
sink.next(current);
} else {
sink.complete();
}
return supplier;
},
supplier -> anyCleanupOperations(supplier)
);