RxJava 从多个源接收数据包并在每个数据包之间延迟写入 OutputStream

RxJava receive data packets from multiple source and write to OutputStream with delay between each packet

我有 Java TCP 客户端套接字读取 InputStream 并通过 RxJava PublishSubject 将数据包分发到应用程序的各个部分。这行得通。

有时我也写到OutputStream。命令被转换成单个数据包(byte[])并推送到流上。为此,我使用

public void writeToSocket(byte[] packet) {
    Completable.fromAction(() -> {
         outputStream.write(packet);
         outputStream.flush();
    }).subscribeOn(Schedulers.io()).subscribe(); 
}

现在我要执行

    outputStream.write(packet);
    outputStream.flush();

满足以下条件

  1. Though source packet is getting created from multiple places (with different commands) simultaneously, execute above for each packet with a delay of 50 milliseconds. Ideally queue-up the packets and execute with delay.
Example:
Place1: createCommand1(), 
Place2: createCommand1(), createCommand4()
Place3: createCommand1(), createCommand2(), .... createCommand10()

有没有什么方法可以使用 RxJava 来实现。提前致谢!

您可以使用序列化的 PublishSubject 收集字节,然后使用 concatMapCompletable 执行写入,然后延迟:

var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .concatMapCompletable(bytes -> 
       Completable.fromAction(() -> {
           outputStream.write(packet);
           outputStream.flush();
       })
       .subscribeOn(Schedulers.io())
       .andThen(Completable.timer(50, TimeUnit.MILLISECONDS))
   )
   .subscribe();

或者,如果您不介意始终将一个线程专用于发射,您可以在 doOnNext:

中执行写入和睡眠
var subject = PublishSubject.<byte[]>create().toSerialized();

subject
  .observeOn(Schedulers.io())
  .doOnNext(packet -> {
     outputStream.write(packet);
     outputStream.flush();
     Thread.sleep(50);
  })
  .subscribe();