重新启动连接到资源的 Observable
Restart Observable connected to a resource
在下面的代码中,我将 TCP 套接字转换为 Observable[Array[Byte]]
:
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
与远程服务器的连接可能随时中断,在这种情况下,我希望 Observable
尝试自动重新连接,但 s.retry
不会执行任何操作。我怎样才能做到这一点?也可以在不创建新的并重新订阅的情况下 "inside" 当前 Observable
完成吗?
您想为每个新订阅设置一个新的套接字连接。这是最简单的 (A)SyncOnSubscribe
,自版本 0.26.5
移植到 RxScala。一个你有这个可观察的你可以使用正常的错误控制方法,如 .retry
.
像这样:
val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
generator = () =>
sock
.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
.getInputStream
)(next = is => Try(is.read()) match {
case Success(-1) => Notification.OnCompleted()
case Success(byte) => Notification.OnNext(byte)
case Failure(e) => Notification.OnError(e)
},
onUnsubscribe = is => Try(is.close)
)
注意:这一次读取一个字节,效率不是很高。您可以使用 ASyncOnSubscribe 改进这一点,或者让您的可观察对象的每个事件都是一个字节数组。
注意:这是一个冷可观察对象,将为每个订阅者创建一个新套接字。例如,这将打开 2 个套接字:
socketObservable.foreach(b => System.out.print(b))
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))
如果这不是你想要的,你可以用 .share
把它变成热门的
在下面的代码中,我将 TCP 套接字转换为 Observable[Array[Byte]]
:
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
与远程服务器的连接可能随时中断,在这种情况下,我希望 Observable
尝试自动重新连接,但 s.retry
不会执行任何操作。我怎样才能做到这一点?也可以在不创建新的并重新订阅的情况下 "inside" 当前 Observable
完成吗?
您想为每个新订阅设置一个新的套接字连接。这是最简单的 (A)SyncOnSubscribe
,自版本 0.26.5
移植到 RxScala。一个你有这个可观察的你可以使用正常的错误控制方法,如 .retry
.
像这样:
val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
generator = () =>
sock
.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
.getInputStream
)(next = is => Try(is.read()) match {
case Success(-1) => Notification.OnCompleted()
case Success(byte) => Notification.OnNext(byte)
case Failure(e) => Notification.OnError(e)
},
onUnsubscribe = is => Try(is.close)
)
注意:这一次读取一个字节,效率不是很高。您可以使用 ASyncOnSubscribe 改进这一点,或者让您的可观察对象的每个事件都是一个字节数组。
注意:这是一个冷可观察对象,将为每个订阅者创建一个新套接字。例如,这将打开 2 个套接字:
socketObservable.foreach(b => System.out.print(b))
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))
如果这不是你想要的,你可以用 .share