scalaz-stream 如何实现 `ask-then-wait-reply` tcp 客户端

scalaz-stream how to implement `ask-then-wait-reply` tcp client

我想实现一个客户端应用程序,它首先向服务器发送请求然后等待其回复(类似于 http)

我的客户端进程可能是

 val topic = async.topic[ByteVector]
 val client = topic.subscribe

这是api

trait Client {
  val incoming = tcp.connect(...)(client)
   val reqBus = topic.pubsh()
   def ask(req: ByteVector): Task[Throwable \/ ByteVector] =  {
      (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
      ???
   }
}

那么,如何实现ask的剩余部分呢?

通常,实现是通过接收器发布消息,然后等待对某些来源(例如您的主题)的某种回复来完成的。

实际上我们的代码中有很多这样的习语:

def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
 merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}

本质上,这第一个挂钩到回复流以等待任何结果 O 确认我们的请求已发送。然后我们发布消息 I 并咨询 pf 任何传入的 O 最终被翻译成 O2 然后终止。