如何将基于 API 的回调转换为基于 Observable 的回调?

How to convert callback based API into one based on Observable?

我正在使用的库使用回调对象发出一系列 Message 对象。

interface MessageCallback {
    onMessage(Message message);
}

使用一些 libraryObject.setCallback(MessageCallback) 调用添加回调,并使用非阻塞 libraryObject.start() 方法调用启动进程。

创建将发出这些对象的 Observable<Message> 的最佳方法是什么?

如果 libraryObject.start() 阻塞怎么办?

我想你需要这样的东西(scala 中给出的例子)

import rx.lang.scala.{Observable, Subscriber}

case class Message(message: String)

trait MessageCallback {
  def onMessage(message: Message)
}

object LibraryObject {
  def setCallback(callback: MessageCallback): Unit = {
    ???
  }

  def removeCallback(callback: MessageCallback): Unit = {
    ???
  }

  def start(): Unit = {
    ???
  }
}

def messagesSource: Observable[Message] =
  Observable((subscriber: Subscriber[Message]) ⇒ {
    val callback = new MessageCallback {
      def onMessage(message: Message) {
        subscriber.onNext(message)
      }
    }
    LibraryObject.setCallback(callback)
    subscriber.add {
      LibraryObject.removeCallback(callback)
    }
  })

至于blocking/non-blocking start():通常基于回调的架构将回调订阅和进程启动分开。在这种情况下,您可以完全独立于 start() 过程的时间来创建任意数量的 messageSource。此外,是否分叉完全由您决定。您的架构与此不同吗?

您还应该以某种方式处理完成该过程。最好的办法是向 MessageCallback 接口添加一个 onCompleted 处理程序。如果你想处理错误,还要添加一个 onError 处理程序。现在看,你刚刚声明了 RxJava 的基石,一个 Observer :-)

1。回调调用无限次

我们可以像这样将它转换为 Observable(以 RxJava 2 为例):

Observable<Message> source = Observable.create(emitter -> {
        MessageCallback callback = message -> emitter.onNext(message);
        libraryObject.setCallback(callback);
        Schedulers.io().scheduleDirect(libraryObject::start);
        emitter.setCancellable(() -> libraryObject.removeCallback(callback));
    })
    .share(); // make it hot

share 使这个可观察 hot,即多个订阅者将共享一个订阅,即最多会有一个回调注册到 libraryObject

我使用 io 调度程序来安排从后台线程进行的 start 调用,因此它不会延迟第一次订阅。

2。单条消息回调

这也是很常见的场景。假设我们有以下 callback-style 异步方法:

libraryObject.requestDataAsync(Some parameters, MessageCallback callback);

然后我们可以像这样将它转换为 Observable(以 RxJava 2 为例):

Observable<Message> makeRequest(parameters) {
    return Observable.create(emitter -> {
        libraryObject.requestDataAsync(parameters, message -> {
            emitter.onNext(message);
            emitter.onComplete();
        });
    });
}