使用 RxScala 进行响应式编程
Reactive Programming using RxScala
我有一个通过套接字协议连接到服务的 Observable。到套接字的连接是通过客户端库进行的。我使用的客户端库有 java.util.Observer 和 我可以注册推送到其中的事件
final class MyObservable extends Observable[MyEvent] {
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
}
}
我有两个我不明白的悬而未决的问题。
如何在订阅者中获取步骤 3 的结果?
每次当我收到 MyEvent 时,订阅者如下所示,我都会看到正在创建一个新连接。最终,对于每个传入事件,第 1 步、第 2 步和第 3 步 运行。
val myObservable = new MyObservale()
myObservable.subscribe()
除非我误解了你的问题,否则你只需拨打 onNext
:
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
// finally notify the subscriber:
subscriber.onNext(myEventFromStep3)
}
订阅的代码会做类似的事情:
myObservable.subscribe(onNext = println(_))
我有一个通过套接字协议连接到服务的 Observable。到套接字的连接是通过客户端库进行的。我使用的客户端库有 java.util.Observer 和 我可以注册推送到其中的事件
final class MyObservable extends Observable[MyEvent] {
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
}
}
我有两个我不明白的悬而未决的问题。
如何在订阅者中获取步骤 3 的结果?
每次当我收到 MyEvent 时,订阅者如下所示,我都会看到正在创建一个新连接。最终,对于每个传入事件,第 1 步、第 2 步和第 3 步 运行。
val myObservable = new MyObservale()
myObservable.subscribe()
除非我误解了你的问题,否则你只需拨打 onNext
:
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
// finally notify the subscriber:
subscriber.onNext(myEventFromStep3)
}
订阅的代码会做类似的事情:
myObservable.subscribe(onNext = println(_))