如何将基于 API 的回调转换为基于 Observable 的回调?
How to convert callback based API into one based on Observable?
我正在使用的库使用回调对象发出一系列 Message
对象。
interface MessageCallback {
onMessage(Message message);
}
使用一些 libraryObject.setCallback(MessageCallback)
调用添加回调,并使用非阻塞 libraryObject.start()
方法调用启动进程。
创建将发出这些对象的 Observable<Message>
的最佳方法是什么?
如果 libraryObject.start()
阻塞怎么办?
我想你需要这样的东西(scala 中给出的例子)
import rx.lang.scala.{Observable, Subscriber}
case class Message(message: String)
trait MessageCallback {
def onMessage(message: Message)
}
object LibraryObject {
def setCallback(callback: MessageCallback): Unit = {
???
}
def removeCallback(callback: MessageCallback): Unit = {
???
}
def start(): Unit = {
???
}
}
def messagesSource: Observable[Message] =
Observable((subscriber: Subscriber[Message]) ⇒ {
val callback = new MessageCallback {
def onMessage(message: Message) {
subscriber.onNext(message)
}
}
LibraryObject.setCallback(callback)
subscriber.add {
LibraryObject.removeCallback(callback)
}
})
至于blocking/non-blocking start()
:通常基于回调的架构将回调订阅和进程启动分开。在这种情况下,您可以完全独立于 start()
过程的时间来创建任意数量的 messageSource
。此外,是否分叉完全由您决定。您的架构与此不同吗?
您还应该以某种方式处理完成该过程。最好的办法是向 MessageCallback 接口添加一个 onCompleted
处理程序。如果你想处理错误,还要添加一个 onError
处理程序。现在看,你刚刚声明了 RxJava 的基石,一个 Observer :-)
1。回调调用无限次
我们可以像这样将它转换为 Observable(以 RxJava 2 为例):
Observable<Message> source = Observable.create(emitter -> {
MessageCallback callback = message -> emitter.onNext(message);
libraryObject.setCallback(callback);
Schedulers.io().scheduleDirect(libraryObject::start);
emitter.setCancellable(() -> libraryObject.removeCallback(callback));
})
.share(); // make it hot
share
使这个可观察 hot,即多个订阅者将共享一个订阅,即最多会有一个回调注册到 libraryObject
。
我使用 io
调度程序来安排从后台线程进行的 start
调用,因此它不会延迟第一次订阅。
2。单条消息回调
这也是很常见的场景。假设我们有以下 callback-style 异步方法:
libraryObject.requestDataAsync(Some parameters, MessageCallback callback);
然后我们可以像这样将它转换为 Observable(以 RxJava 2 为例):
Observable<Message> makeRequest(parameters) {
return Observable.create(emitter -> {
libraryObject.requestDataAsync(parameters, message -> {
emitter.onNext(message);
emitter.onComplete();
});
});
}
我正在使用的库使用回调对象发出一系列 Message
对象。
interface MessageCallback {
onMessage(Message message);
}
使用一些 libraryObject.setCallback(MessageCallback)
调用添加回调,并使用非阻塞 libraryObject.start()
方法调用启动进程。
创建将发出这些对象的 Observable<Message>
的最佳方法是什么?
如果 libraryObject.start()
阻塞怎么办?
我想你需要这样的东西(scala 中给出的例子)
import rx.lang.scala.{Observable, Subscriber}
case class Message(message: String)
trait MessageCallback {
def onMessage(message: Message)
}
object LibraryObject {
def setCallback(callback: MessageCallback): Unit = {
???
}
def removeCallback(callback: MessageCallback): Unit = {
???
}
def start(): Unit = {
???
}
}
def messagesSource: Observable[Message] =
Observable((subscriber: Subscriber[Message]) ⇒ {
val callback = new MessageCallback {
def onMessage(message: Message) {
subscriber.onNext(message)
}
}
LibraryObject.setCallback(callback)
subscriber.add {
LibraryObject.removeCallback(callback)
}
})
至于blocking/non-blocking start()
:通常基于回调的架构将回调订阅和进程启动分开。在这种情况下,您可以完全独立于 start()
过程的时间来创建任意数量的 messageSource
。此外,是否分叉完全由您决定。您的架构与此不同吗?
您还应该以某种方式处理完成该过程。最好的办法是向 MessageCallback 接口添加一个 onCompleted
处理程序。如果你想处理错误,还要添加一个 onError
处理程序。现在看,你刚刚声明了 RxJava 的基石,一个 Observer :-)
1。回调调用无限次
我们可以像这样将它转换为 Observable(以 RxJava 2 为例):
Observable<Message> source = Observable.create(emitter -> {
MessageCallback callback = message -> emitter.onNext(message);
libraryObject.setCallback(callback);
Schedulers.io().scheduleDirect(libraryObject::start);
emitter.setCancellable(() -> libraryObject.removeCallback(callback));
})
.share(); // make it hot
share
使这个可观察 hot,即多个订阅者将共享一个订阅,即最多会有一个回调注册到 libraryObject
。
我使用 io
调度程序来安排从后台线程进行的 start
调用,因此它不会延迟第一次订阅。
2。单条消息回调
这也是很常见的场景。假设我们有以下 callback-style 异步方法:
libraryObject.requestDataAsync(Some parameters, MessageCallback callback);
然后我们可以像这样将它转换为 Observable(以 RxJava 2 为例):
Observable<Message> makeRequest(parameters) {
return Observable.create(emitter -> {
libraryObject.requestDataAsync(parameters, message -> {
emitter.onNext(message);
emitter.onComplete();
});
});
}