如何从阻止 vertx future 到 RxJava Observable

How to go from blocking vertx future to RxJava Observable

我在我们的代码库中看到了这样的代码。

   public Observable<Optional<DeviceInfo>> getDeviceInfo(final String userAgent) {
        final ObservableFuture<Optional<DeviceInfo>> observable = RxHelper.observableFuture();
        vertx.executeBlocking(future -> {
            try {
                final Optional<Device> device = Optional.ofNullable(engine.get().getDeviceForRequest(userAgent));
                if (device.isPresent()) {
                    future.complete(Optional.of(new DeviceInfo()));
                } else {
                    future.complete(Optional.empty());
                }
            } catch (final RuntimeException e) {
                LOGGER.error("Unable to get the UA device info {}, reason {}", userAgent, e.getMessage());
                future.fail(e.getMessage());
            }
        }, observable.toHandler());

        return observable.single();
    }

对我来说,编写这么多代码来执行这个阻塞代码并将未来映射到单个 Observable 似乎有点奇怪。

难道没有更简单更好的方法来做到这一点吗?例如一些方便的工厂方法等

随着 Vert.x API for RxJavaOptional.map:

public Single<Optional<DeviceInfo>> getDeviceInfo(final String userAgent) {
  return vertx.rxExecuteBlocking(future -> {
    try {
      final Optional<Device> device = Optional.ofNullable(engine.get().getDeviceForRequest(userAgent));
      future.complete(device.map(d -> new DeviceInfo()));
    } catch (final RuntimeException e) {
      LOGGER.error("Unable to get the UA device info {}, reason {}", userAgent, e.getMessage());
      future.fail(e.getMessage());
    }
  });
}