在流的生命周期内缓存已处理的消息
Caching a processed message for the lifetime of a stream
我正在重做之前的问题,以使其更加集中和清晰。
我有以下情况:
case class Hello(u:String) extends A
case class Goodbye extends A
case class GoodbyeAck(u:String) extends B
val myProcessor: Flow[A, B, Unit] = Flow[A]
.map {
case Hello(u:String) => // I want to cache u (the username)
case Goodbye => GoodbyeAck(u)
}
因此,在流的开头我收到了一个 Hello(用户名),在流的末尾,我收到了一个 Goodbye。我想回复再见(用户名)。
我应该如何缓存用户名 (u) 以便它在流的生命周期内可用,这样我就可以在该说再见的时候拥有它?我可以利用一些上下文吗?还是需要在框架外做?
这似乎是少数几个需要可变状态的示例之一。虽然不是 "purely functional",但可以通过不允许外部影响但仍然提供您正在寻找的功能的方式来隔离状态。
注意:您的问题没有具体说明在收到 Hello 的情况下,地图应 return 编辑什么 B 值。类型 B 的值是必需的,因为简单的缓存是类型 Unit。因此使用flatMapConcat来符合问题中Flow的要求,即return收到Hello时什么都没有:
def cacheProcessor(defaultString : String = "") : Flow[A, B, Unit] = {
var userCache : String = defaultString
Flow[A] flatMapConcat {
case Hello(u : String) => {
userCache = u
Source.empty[B]
}
case Goodbye => Source.single[B](GoodbyeAck(userCache))
}
}//end def cacheProcessor
尽管 userCache 是可变状态,但无法在 returned Flow.flatMapConcat 之外访问它。重要的一点是 cacheProcessor 必须是 def
以便每个 Flow 都有一个唯一的 userCache
。
我正在重做之前的问题,以使其更加集中和清晰。 我有以下情况:
case class Hello(u:String) extends A
case class Goodbye extends A
case class GoodbyeAck(u:String) extends B
val myProcessor: Flow[A, B, Unit] = Flow[A]
.map {
case Hello(u:String) => // I want to cache u (the username)
case Goodbye => GoodbyeAck(u)
}
因此,在流的开头我收到了一个 Hello(用户名),在流的末尾,我收到了一个 Goodbye。我想回复再见(用户名)。
我应该如何缓存用户名 (u) 以便它在流的生命周期内可用,这样我就可以在该说再见的时候拥有它?我可以利用一些上下文吗?还是需要在框架外做?
这似乎是少数几个需要可变状态的示例之一。虽然不是 "purely functional",但可以通过不允许外部影响但仍然提供您正在寻找的功能的方式来隔离状态。
注意:您的问题没有具体说明在收到 Hello 的情况下,地图应 return 编辑什么 B 值。类型 B 的值是必需的,因为简单的缓存是类型 Unit。因此使用flatMapConcat来符合问题中Flow的要求,即return收到Hello时什么都没有:
def cacheProcessor(defaultString : String = "") : Flow[A, B, Unit] = {
var userCache : String = defaultString
Flow[A] flatMapConcat {
case Hello(u : String) => {
userCache = u
Source.empty[B]
}
case Goodbye => Source.single[B](GoodbyeAck(userCache))
}
}//end def cacheProcessor
尽管 userCache 是可变状态,但无法在 returned Flow.flatMapConcat 之外访问它。重要的一点是 cacheProcessor 必须是 def
以便每个 Flow 都有一个唯一的 userCache
。