将 ConnectableFlux 用于热流 REST 端点

Using ConnectableFlux for hot stream REST endpoint

我正在尝试创建一个 REST 端点以使用 Reactor 订阅热流。

我的流测试提供程序如下所示:

@Service
class TestProvider {
  fun getStream(resourceId: String): ConnectableFlux<QData> {
    return Flux.create<QData> {
      for (i in 1..10) {
        it.next(QData(LocalDateTime.now().toString(), "next"))
        Thread.sleep(500L)
      }
      it.complete()
    }.publish()
  }
}

我的 REST 端点服务如下所示:

@Service
class DataService @Autowired constructor(
  private val prv: TestProvider
) {

  private val streams = mutableMapOf<String, ConnectableFlux<QData>>()

  fun subscribe(resourceId: String): Flux<QData> {
    val stream = getStream(resourceId)
    return Flux.push { flux ->
      stream.subscribe{
        flux.next(it)
      }
      flux.complete()
    }
  }

  private fun getStream(resourceId: String): ConnectableFlux<QData> {
    if(streams.containsKey(resourceId).not()) {
      streams.put(resourceId, createStream(resourceId))
    }
    return streams.get(resourceId)!!
  }

  private fun createStream(resourceId: String): ConnectableFlux<QData> {
    val stream = prv.getStream(resourceId)
    stream.connect()
    return stream
  }
}

控制器看起来像这样:

@RestController
class DataController @Autowired constructor(
  private val dataService: DataService
): DataApi {

  override fun subscribe(resourceId: String): Flux<QData> {
    return dataService.subscribe(resourceId)
  }
}

API 界面如下所示:

interface DataApi {

  @ApiResponses(value = [
    ApiResponse(responseCode = "202", description = "Client is subscribed", content = [
      Content(mediaType = "application/json", array = ArraySchema(schema = Schema(implementation = QData::class)))
    ])
  ])
  @GetMapping(path = ["/subscription/{resourceId}"], produces = [MediaType.APPLICATION_JSON_VALUE])
  fun subscribe(
    @Parameter(description = "The resource id for which quality data is subscribed for", required = true, example = "example",allowEmptyValue = false)
    @PathVariable("resourceId", required = true) @NotEmpty resourceId: String
  ): Flux<QData>
}

不幸的是,我的 curl 提供了一个空数组。

有人知道问题出在哪里吗?提前致谢!

我不得不 运行 connect()DataService 中异步:

    CompletableFuture.runAsync {
      stream.connect()
    }