将数据从演员流式传输到播放结果

Streaming data from an actor to Play Result

我有数据以块的形式到达演员,我想 return 这些块作为 Play Result 中的流。由于从 Ok.stream 获得响应的唯一方法看起来是理想的候选者,因此如下所示:

Action.async { request =>
  (source ? GetStream()).map {
     case enumerator => Ok.stream(enumerator)
  }
}

我会 return 从我的 actor 发送 Enumerator[Array[Byte]],然后当消息到达 actor 时,在 actor 内部不断将块推入枚举器。但是:从 actor 返回一个可变的枚举器显然是某种违规行为。

有没有更合适的方法来完成这个?我认为 akka-streamakka.io 可能是可以解决问题的抽象 space,但我看不出它们将如何应用。

在提出更好的解决方案之前,我已经确定的解决方案是使用 ActorDSL 在我的非参与者调用者的上下文中捕获枚举器:

case GET(p"/stream/$streamId") => Action.async { request =>
    val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]

    actor(new Act {

      storage ! Get(streamId)
      become {
        case DataStart(id, parts, bytes) =>
          sender() ! DataAck(id)
          become {
            case DataPart(_, i, b) =>
              channel.push(b.toArray)
            case DataEnd(_) =>
              channel.eofAndEnd()
          }
      }
    })

    Ok.stream(enumerator).as("text/plain")
  }

反对从参与者返回和枚举器的论据是它是可变的和不可序列化的。但是您需要一个参与者来接收一系列消息以提供给枚举器。通过 DSL 创建参与者,它明确地嵌入到调用上下文中,因此不存在枚举器跨越序列化边界泄漏的风险。