akka-http 服务器中的客户端会话存储

Client session storage in an akka-http server

在 akka-http 服务中,每个客户端会话如何缓存一些信息?这在文档中不是很明显。例如,我想为每个连接创建一个演员。

我应该在哪里创建 actor,如何从我的舞台中获取对它的引用?

我的服务是这样绑定的:

  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = bindAddress, port = bindPort)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource
      .to(Sink.foreach { connection => 
        connection.handleWithSyncHandler (requestHandler)
        // seems like I should set up some session state storage here,
        // such as my actor      
      })
      .run()

...

及以后:

  val packetProcessor: Flow[A, B, Unit] =  Flow[A]
    .map {
      case Something =>
        // can i use the actor here, or access my session state?
    }

我怀疑我可能在试图使其适合时误解了整个范例。我不知道是否有内置的东西或我需要手动实现多少。

我发现 Agent 是一种非常方便的并发缓存机制。

比如说,您想要保留一个 运行 Set 您已连接的所有远程地址。您可以设置一个代理来存储值和一个 Flow 来写入缓存:

import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent

import scala.collection.immutable

val addressCache = Agent(immutable.Set.empty[java.net.InetSocketAddress])

import akka.stream.scaladsl.Flow

val cacheAddressFlow = Flow[IncomingConnection] map { conn =>
  addressCache send (_ + conn.remoteAddress) //updates the cache
  conn //forwards the connection to the rest of the stream
}

此 Flow 可以成为您的 Stream 的一部分:

val bindingFuture: Future[Http.ServerBinding] =
  serverSource.via(cacheAddressFlow)
              .to(Sink.foreach { connection => 
    connection.handleWithSyncHandler (requestHandler)
  })
  .run()

然后您可以 "query" 完全在绑定逻辑之外的缓存:

def somewhereElseInTheCode = {
  val currentAddressSet = addressCache.get

  println(s"address count so far: ${currentAddressSet.size}")
}

如果您的目标是将所有 IncomingConnection 值发送到 Actor 进行处理,那么可以使用 Sink.actorRef:

来完成
object ConnectionStreamTerminated

class ConnectionActor extends Actor {
  override def receive = {
    case conn : IncomingConnection => ???
    case ConnectionStreamTerminated => ???
  }
}

val actorRef = actorSystem actorOf Props[ConnectionActor]

val actorSink = 
  Sink.actorRef[IncomingConnection](actorRef, ConnectionStreamTerminated)

val bindingFuture: Future[Http.ServerBinding] =
  serverSource.runWith(actorSink)

由于建议的代理已被弃用。我建议使用 akka-http-session。它确保会话数据安全且无法被篡改。