scalaz-stream 如何实现 `ask-then-wait-reply` tcp 客户端
scalaz-stream how to implement `ask-then-wait-reply` tcp client
我想实现一个客户端应用程序,它首先向服务器发送请求然后等待其回复(类似于 http)
我的客户端进程可能是
val topic = async.topic[ByteVector]
val client = topic.subscribe
这是api
trait Client {
val incoming = tcp.connect(...)(client)
val reqBus = topic.pubsh()
def ask(req: ByteVector): Task[Throwable \/ ByteVector] = {
(tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
???
}
}
那么,如何实现ask
的剩余部分呢?
通常,实现是通过接收器发布消息,然后等待对某些来源(例如您的主题)的某种回复来完成的。
实际上我们的代码中有很多这样的习语:
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}
本质上,这第一个挂钩到回复流以等待任何结果 O
确认我们的请求已发送。然后我们发布消息 I
并咨询 pf
任何传入的 O
最终被翻译成 O2
然后终止。
我想实现一个客户端应用程序,它首先向服务器发送请求然后等待其回复(类似于 http)
我的客户端进程可能是
val topic = async.topic[ByteVector]
val client = topic.subscribe
这是api
trait Client {
val incoming = tcp.connect(...)(client)
val reqBus = topic.pubsh()
def ask(req: ByteVector): Task[Throwable \/ ByteVector] = {
(tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
???
}
}
那么,如何实现ask
的剩余部分呢?
通常,实现是通过接收器发布消息,然后等待对某些来源(例如您的主题)的某种回复来完成的。
实际上我们的代码中有很多这样的习语:
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}
本质上,这第一个挂钩到回复流以等待任何结果 O
确认我们的请求已发送。然后我们发布消息 I
并咨询 pf
任何传入的 O
最终被翻译成 O2
然后终止。