RxJava 从多个源接收数据包并在每个数据包之间延迟写入 OutputStream
RxJava receive data packets from multiple source and write to OutputStream with delay between each packet
我有 Java TCP 客户端套接字读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这行得通。
有时我也写到OutputStream。命令被转换成单个数据包(byte[])并推送到流上。为此,我使用
public void writeToSocket(byte[] packet) {
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
}).subscribeOn(Schedulers.io()).subscribe();
}
现在我要执行
outputStream.write(packet);
outputStream.flush();
满足以下条件
- Though source packet is getting created from multiple places (with different commands) simultaneously, execute above for each packet with a delay of 50 milliseconds. Ideally queue-up the packets and execute with delay.
Example:
Place1: createCommand1(),
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()
有没有什么方法可以使用 RxJava 来实现。提前致谢!
您可以使用序列化的 PublishSubject
收集字节,然后使用 concatMapCompletable
执行写入,然后延迟:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.concatMapCompletable(bytes ->
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
})
.subscribeOn(Schedulers.io())
.andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
)
.subscribe();
或者,如果您不介意始终将一个线程专用于发射,您可以在 doOnNext
:
中执行写入和睡眠
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.observeOn(Schedulers.io())
.doOnNext(packet -> {
outputStream.write(packet);
outputStream.flush();
Thread.sleep(50);
})
.subscribe();
我有 Java TCP 客户端套接字读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这行得通。
有时我也写到OutputStream。命令被转换成单个数据包(byte[])并推送到流上。为此,我使用
public void writeToSocket(byte[] packet) {
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
}).subscribeOn(Schedulers.io()).subscribe();
}
现在我要执行
outputStream.write(packet);
outputStream.flush();
满足以下条件
- Though source packet is getting created from multiple places (with different commands) simultaneously, execute above for each packet with a delay of 50 milliseconds. Ideally queue-up the packets and execute with delay.
Example:
Place1: createCommand1(),
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()
有没有什么方法可以使用 RxJava 来实现。提前致谢!
您可以使用序列化的 PublishSubject
收集字节,然后使用 concatMapCompletable
执行写入,然后延迟:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.concatMapCompletable(bytes ->
Completable.fromAction(() -> {
outputStream.write(packet);
outputStream.flush();
})
.subscribeOn(Schedulers.io())
.andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
)
.subscribe();
或者,如果您不介意始终将一个线程专用于发射,您可以在 doOnNext
:
var subject = PublishSubject.<byte[]>create().toSerialized();
subject
.observeOn(Schedulers.io())
.doOnNext(packet -> {
outputStream.write(packet);
outputStream.flush();
Thread.sleep(50);
})
.subscribe();