akka-http 服务器中的客户端会话存储
Client session storage in an akka-http server
在 akka-http 服务中,每个客户端会话如何缓存一些信息?这在文档中不是很明显。例如,我想为每个连接创建一个演员。
我应该在哪里创建 actor,如何从我的舞台中获取对它的引用?
我的服务是这样绑定的:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = bindAddress, port = bindPort)
val bindingFuture: Future[Http.ServerBinding] =
serverSource
.to(Sink.foreach { connection =>
connection.handleWithSyncHandler (requestHandler)
// seems like I should set up some session state storage here,
// such as my actor
})
.run()
...
及以后:
val packetProcessor: Flow[A, B, Unit] = Flow[A]
.map {
case Something =>
// can i use the actor here, or access my session state?
}
我怀疑我可能在试图使其适合时误解了整个范例。我不知道是否有内置的东西或我需要手动实现多少。
我发现 Agent
是一种非常方便的并发缓存机制。
比如说,您想要保留一个 运行 Set
您已连接的所有远程地址。您可以设置一个代理来存储值和一个 Flow
来写入缓存:
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent
import scala.collection.immutable
val addressCache = Agent(immutable.Set.empty[java.net.InetSocketAddress])
import akka.stream.scaladsl.Flow
val cacheAddressFlow = Flow[IncomingConnection] map { conn =>
addressCache send (_ + conn.remoteAddress) //updates the cache
conn //forwards the connection to the rest of the stream
}
此 Flow 可以成为您的 Stream 的一部分:
val bindingFuture: Future[Http.ServerBinding] =
serverSource.via(cacheAddressFlow)
.to(Sink.foreach { connection =>
connection.handleWithSyncHandler (requestHandler)
})
.run()
然后您可以 "query" 完全在绑定逻辑之外的缓存:
def somewhereElseInTheCode = {
val currentAddressSet = addressCache.get
println(s"address count so far: ${currentAddressSet.size}")
}
如果您的目标是将所有 IncomingConnection
值发送到 Actor
进行处理,那么可以使用 Sink.actorRef
:
来完成
object ConnectionStreamTerminated
class ConnectionActor extends Actor {
override def receive = {
case conn : IncomingConnection => ???
case ConnectionStreamTerminated => ???
}
}
val actorRef = actorSystem actorOf Props[ConnectionActor]
val actorSink =
Sink.actorRef[IncomingConnection](actorRef, ConnectionStreamTerminated)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.runWith(actorSink)
由于建议的代理已被弃用。我建议使用 akka-http-session
。它确保会话数据安全且无法被篡改。
在 akka-http 服务中,每个客户端会话如何缓存一些信息?这在文档中不是很明显。例如,我想为每个连接创建一个演员。
我应该在哪里创建 actor,如何从我的舞台中获取对它的引用?
我的服务是这样绑定的:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = bindAddress, port = bindPort)
val bindingFuture: Future[Http.ServerBinding] =
serverSource
.to(Sink.foreach { connection =>
connection.handleWithSyncHandler (requestHandler)
// seems like I should set up some session state storage here,
// such as my actor
})
.run()
...
及以后:
val packetProcessor: Flow[A, B, Unit] = Flow[A]
.map {
case Something =>
// can i use the actor here, or access my session state?
}
我怀疑我可能在试图使其适合时误解了整个范例。我不知道是否有内置的东西或我需要手动实现多少。
我发现 Agent
是一种非常方便的并发缓存机制。
比如说,您想要保留一个 运行 Set
您已连接的所有远程地址。您可以设置一个代理来存储值和一个 Flow
来写入缓存:
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent
import scala.collection.immutable
val addressCache = Agent(immutable.Set.empty[java.net.InetSocketAddress])
import akka.stream.scaladsl.Flow
val cacheAddressFlow = Flow[IncomingConnection] map { conn =>
addressCache send (_ + conn.remoteAddress) //updates the cache
conn //forwards the connection to the rest of the stream
}
此 Flow 可以成为您的 Stream 的一部分:
val bindingFuture: Future[Http.ServerBinding] =
serverSource.via(cacheAddressFlow)
.to(Sink.foreach { connection =>
connection.handleWithSyncHandler (requestHandler)
})
.run()
然后您可以 "query" 完全在绑定逻辑之外的缓存:
def somewhereElseInTheCode = {
val currentAddressSet = addressCache.get
println(s"address count so far: ${currentAddressSet.size}")
}
如果您的目标是将所有 IncomingConnection
值发送到 Actor
进行处理,那么可以使用 Sink.actorRef
:
object ConnectionStreamTerminated
class ConnectionActor extends Actor {
override def receive = {
case conn : IncomingConnection => ???
case ConnectionStreamTerminated => ???
}
}
val actorRef = actorSystem actorOf Props[ConnectionActor]
val actorSink =
Sink.actorRef[IncomingConnection](actorRef, ConnectionStreamTerminated)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.runWith(actorSink)
由于建议的代理已被弃用。我建议使用 akka-http-session
。它确保会话数据安全且无法被篡改。