将 ConnectableFlux 用于热流 REST 端点
Using ConnectableFlux for hot stream REST endpoint
我正在尝试创建一个 REST 端点以使用 Reactor 订阅热流。
我的流测试提供程序如下所示:
@Service
class TestProvider {
fun getStream(resourceId: String): ConnectableFlux<QData> {
return Flux.create<QData> {
for (i in 1..10) {
it.next(QData(LocalDateTime.now().toString(), "next"))
Thread.sleep(500L)
}
it.complete()
}.publish()
}
}
我的 REST 端点服务如下所示:
@Service
class DataService @Autowired constructor(
private val prv: TestProvider
) {
private val streams = mutableMapOf<String, ConnectableFlux<QData>>()
fun subscribe(resourceId: String): Flux<QData> {
val stream = getStream(resourceId)
return Flux.push { flux ->
stream.subscribe{
flux.next(it)
}
flux.complete()
}
}
private fun getStream(resourceId: String): ConnectableFlux<QData> {
if(streams.containsKey(resourceId).not()) {
streams.put(resourceId, createStream(resourceId))
}
return streams.get(resourceId)!!
}
private fun createStream(resourceId: String): ConnectableFlux<QData> {
val stream = prv.getStream(resourceId)
stream.connect()
return stream
}
}
控制器看起来像这样:
@RestController
class DataController @Autowired constructor(
private val dataService: DataService
): DataApi {
override fun subscribe(resourceId: String): Flux<QData> {
return dataService.subscribe(resourceId)
}
}
API 界面如下所示:
interface DataApi {
@ApiResponses(value = [
ApiResponse(responseCode = "202", description = "Client is subscribed", content = [
Content(mediaType = "application/json", array = ArraySchema(schema = Schema(implementation = QData::class)))
])
])
@GetMapping(path = ["/subscription/{resourceId}"], produces = [MediaType.APPLICATION_JSON_VALUE])
fun subscribe(
@Parameter(description = "The resource id for which quality data is subscribed for", required = true, example = "example",allowEmptyValue = false)
@PathVariable("resourceId", required = true) @NotEmpty resourceId: String
): Flux<QData>
}
不幸的是,我的 curl 提供了一个空数组。
有人知道问题出在哪里吗?提前致谢!
我不得不 运行 connect()
在 DataService
中异步:
CompletableFuture.runAsync {
stream.connect()
}
我正在尝试创建一个 REST 端点以使用 Reactor 订阅热流。
我的流测试提供程序如下所示:
@Service
class TestProvider {
fun getStream(resourceId: String): ConnectableFlux<QData> {
return Flux.create<QData> {
for (i in 1..10) {
it.next(QData(LocalDateTime.now().toString(), "next"))
Thread.sleep(500L)
}
it.complete()
}.publish()
}
}
我的 REST 端点服务如下所示:
@Service
class DataService @Autowired constructor(
private val prv: TestProvider
) {
private val streams = mutableMapOf<String, ConnectableFlux<QData>>()
fun subscribe(resourceId: String): Flux<QData> {
val stream = getStream(resourceId)
return Flux.push { flux ->
stream.subscribe{
flux.next(it)
}
flux.complete()
}
}
private fun getStream(resourceId: String): ConnectableFlux<QData> {
if(streams.containsKey(resourceId).not()) {
streams.put(resourceId, createStream(resourceId))
}
return streams.get(resourceId)!!
}
private fun createStream(resourceId: String): ConnectableFlux<QData> {
val stream = prv.getStream(resourceId)
stream.connect()
return stream
}
}
控制器看起来像这样:
@RestController
class DataController @Autowired constructor(
private val dataService: DataService
): DataApi {
override fun subscribe(resourceId: String): Flux<QData> {
return dataService.subscribe(resourceId)
}
}
API 界面如下所示:
interface DataApi {
@ApiResponses(value = [
ApiResponse(responseCode = "202", description = "Client is subscribed", content = [
Content(mediaType = "application/json", array = ArraySchema(schema = Schema(implementation = QData::class)))
])
])
@GetMapping(path = ["/subscription/{resourceId}"], produces = [MediaType.APPLICATION_JSON_VALUE])
fun subscribe(
@Parameter(description = "The resource id for which quality data is subscribed for", required = true, example = "example",allowEmptyValue = false)
@PathVariable("resourceId", required = true) @NotEmpty resourceId: String
): Flux<QData>
}
不幸的是,我的 curl 提供了一个空数组。
有人知道问题出在哪里吗?提前致谢!
我不得不 运行 connect()
在 DataService
中异步:
CompletableFuture.runAsync {
stream.connect()
}