从内部订阅 flux 在 Spring webFlux java 中订阅

Subscribe to flux from inside subscribe in Spring webFlux java

我已经使用 spring 反应器库编写了一个逻辑来获取所有操作员,然后在异步模式下为每个操作员(分页)获取所有设备。

创建了一个 flux 以获取所有运算符,然后订阅它。

    final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
    operatorDetailsFlux
        .subscribe(operatorDetailsList -> {
          for (final OperatorDetails operatorDetails : operatorDetailsList) {
            getAndCacheDevicesForOperator(operatorDetails.getId());
          }
        });

现在,对于每个运营商,我正在获取需要多个订阅才能获得设备单声道的设备,通过订阅单声道来异步获取所有页面。

private void getAndCacheDevicesForOperator(final int operatorId) {
    Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
        operatorId, 0);

    deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
      final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
      final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();


      int deviceCount = PaginatedResponseEntity.getCount();
      while (deviceCount < totalDevicesInOperator) {
        final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
            operatorId, deviceCount);

        deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
          final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
              .getItems();
          // work on devices
        });

        deviceCount += DEVICE_PAGE_SIZE;
      }
    });
  }

这段代码工作正常。但我的问题是从订阅内部订阅单声道是个好主意吗?

我将其分解为两个流程,第一个是获取所有操作员,然后是每个操作员的所有设备。

对于分页,我使用 Flux.expand 提取所有页面。

public Flux<OperatorDetails> getAllOperators() {
  return getOperatorsMonoWithRetryAndErrorSpec(0)
      .expand(paginatedResponse -> {
        final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
        if (morePagesAvailable(operatorDetailsPage) {
          return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

public Flux<Device> getAllDevices(final int opId, final int offset) {
  return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
      .expand(paginatedResponse -> {
        final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
        if (morePagesAvailabile(deviceDetailsPage)) {
          return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
              deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

最后我创建了一个管道并订阅它以触发管道。

operatorDetailsFlux
    .flatMap(operatorDetails -> {
        return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
    })
    .subscribe(deviceDetails -> {
      // act on devices
    });