如何在 Akka Typed 中使用 AskPattern.ask? (Java/Kotlin)

How to use AskPattern.ask in Akka Typed? (Java/Kotlin)

我在使用 Akka Typed 中的询问模式时遇到了一些问题。我正在尝试从演员外部发送消息,并试图关注 this example。但是,我认为我的设置与示例有点不同:

object SimulatorManager {

    fun create(): Behavior<SimulatorManagerMessage> {
        val state = ... // initialize state
        return waitingToStartSimulation(state)
    }

    private fun waitingToStartSimulation(state: SimulatorManagerState): Behavior<SimulatorManagerMessage> {
        return Behaviors.receive { context, message ->
            when (message) {
                is StartSimulation -> {
                    // start simulation and update state
                    waitingToStartSimulation(state)
                }
                else -> Behaviors.same()
            }
        }
    }
}

sealed class SimulatorManagerMessage

data class StartSimulation(
    val consumer: ConsumerCredentials,
    val storeId: Long,
    val itemId: Long,
    val simTimeIncrement: Double,
    val realTimeIncrement: Double,
) : SimulatorManagerMessage()

我正在尝试从 Actor 系统外部向 SimulatorManager 发送 StartSimulation 消息。但是,我对为 replyTo 函数参数输入什么感到困惑。

class SimulatorController(
    val systemRef: ActorSystem<SimulatorManagerMessage>,
    private val simManagerRef: ActorRef<SimulatorManagerMessage>
) {

    fun startSimulation(request: StartSimulationRequest) {
        val msg = request.toMessage()
        val result = AskPattern.ask<SimulatorManagerMessage, SimulatorController>(
            simManagerRef,
            { replyTo ->  },        // what should go here? 
            Duration.ofSeconds(15),
            systemRef.scheduler()
        )
    }
}

它说参数的类型应该是Function<ActorRef<SimulatorController!>!, SimulatorManagerMessage!>!,但我不知道如何创建这样的函数。任何帮助将不胜感激!!

replyTo -> 函数构造要发送的消息并注入对将接收消息作为回复的参与者的引用。

这是因为,为了异步,ask 模式有效地做的是:

  • 向某个演员发送消息(称之为“A”)
  • 产生一个演员(演员“B”),其唯一目的是接收回复并用该回复完成未来(或者如果达到超时则使该未来失败;演员“B”将在它收到消息或超时)

在经典的(未类型化的)Akka 中,消息将被构造为接收者可以确定每条消息的 sender,因此询问模式非常简单(但是,当从演员发送询问时,您必须小心由此产生的未来:基本上,您可以做的唯一安全的事情就是调整邮件并将其通过管道发送到您的邮箱)。

在 Akka Typed 中,因为 ActorRef 现在控制可以发送哪些消息,所以 sender 不可用。所以现在您必须在构成协议的消息中显式建模预期回复的类型。

这是我承认这是我写的第一个 Kotlin,但你可能有这样的东西:

sealed class GradebookCommand

data class GetGradeFor(val student: String, val replyTo: ActorRef<GradeForStudent>): GradebookCommand()

data class GradeForStudent(val student: String, val score: Int, val outOf: Int)

sealed class GradebookRouterCommand

data class GetGradebookActorFor(val classId: String, val replyTo: ActorRef<GradebookIs>): GradebookRouterCommand()

data class GradebookIs(val classId: String, val gradebookActor: ActorRef<GradebookCommand>)

所以假设 gradebookRouter 是一个 ActorRef<GradebookRouterCommand>,你会问

val classId: String = "asdf"
val gradebookFuture: CompletionStage<GradebookIs> =
  AskPattern.ask<GradebookRouterCommand, GradebookIs>(
    gradebookRouter,
    { replyTo -> GetGradebookActorFor(classId, replyTo) },
    Duration.ofSeconds(15),
    systemRef.scheduler
  )

gradebookFuture 最终将通过 gradebookRouter 导致发送的 GradebookIs 消息作为回复完成(我使用该措辞是因为,明确说明 replyTo 是它有助于将工作委托给另一个参与者:您也可以在 Akka Classic 中这样做,但是在保留发件人方面有一些微妙之处,很容易搞砸)。

// This is perhaps where my Kotlin/Java futures code goes off the rails
val student: String = "Parry Hotter"
val gradeFuture: CompletionStage<GradeForStudent> =
  gradebookFuture.thenComposeAsync({ gradebook ->
    AskPattern.ask<GradebookCommand, GradeForStudent>(
      gradebook,
      { replyTo -> GetGradeFor(student, replyTo) },
      Duration.ofSeconds(15),
      systemRef.scheduler
    )})

免责声明:Java/Kotlin 的类型系统相对于 Scala 的微妙之处(例如 covariance/contravariance 周围)可能会使该代码无法工作,但我希望它能使事情变得相当清楚。

TL;DR:确保您的邮件中有用作回复地址的字段。它们不必命名为 replyTo,尽管这肯定是作为惯例出现的。