将数据从演员流式传输到播放结果
Streaming data from an actor to Play Result
我有数据以块的形式到达演员,我想 return 这些块作为 Play Result
中的流。由于从 Ok.stream
获得响应的唯一方法看起来是理想的候选者,因此如下所示:
Action.async { request =>
(source ? GetStream()).map {
case enumerator => Ok.stream(enumerator)
}
}
我会 return 从我的 actor 发送 Enumerator[Array[Byte]]
,然后当消息到达 actor 时,在 actor 内部不断将块推入枚举器。但是:从 actor 返回一个可变的枚举器显然是某种违规行为。
有没有更合适的方法来完成这个?我认为 akka-stream
或 akka.io
可能是可以解决问题的抽象 space,但我看不出它们将如何应用。
在提出更好的解决方案之前,我已经确定的解决方案是使用 ActorDSL 在我的非参与者调用者的上下文中捕获枚举器:
case GET(p"/stream/$streamId") => Action.async { request =>
val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]
actor(new Act {
storage ! Get(streamId)
become {
case DataStart(id, parts, bytes) =>
sender() ! DataAck(id)
become {
case DataPart(_, i, b) =>
channel.push(b.toArray)
case DataEnd(_) =>
channel.eofAndEnd()
}
}
})
Ok.stream(enumerator).as("text/plain")
}
反对从参与者返回和枚举器的论据是它是可变的和不可序列化的。但是您需要一个参与者来接收一系列消息以提供给枚举器。通过 DSL 创建参与者,它明确地嵌入到调用上下文中,因此不存在枚举器跨越序列化边界泄漏的风险。
我有数据以块的形式到达演员,我想 return 这些块作为 Play Result
中的流。由于从 Ok.stream
获得响应的唯一方法看起来是理想的候选者,因此如下所示:
Action.async { request =>
(source ? GetStream()).map {
case enumerator => Ok.stream(enumerator)
}
}
我会 return 从我的 actor 发送 Enumerator[Array[Byte]]
,然后当消息到达 actor 时,在 actor 内部不断将块推入枚举器。但是:从 actor 返回一个可变的枚举器显然是某种违规行为。
有没有更合适的方法来完成这个?我认为 akka-stream
或 akka.io
可能是可以解决问题的抽象 space,但我看不出它们将如何应用。
在提出更好的解决方案之前,我已经确定的解决方案是使用 ActorDSL 在我的非参与者调用者的上下文中捕获枚举器:
case GET(p"/stream/$streamId") => Action.async { request =>
val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]
actor(new Act {
storage ! Get(streamId)
become {
case DataStart(id, parts, bytes) =>
sender() ! DataAck(id)
become {
case DataPart(_, i, b) =>
channel.push(b.toArray)
case DataEnd(_) =>
channel.eofAndEnd()
}
}
})
Ok.stream(enumerator).as("text/plain")
}
反对从参与者返回和枚举器的论据是它是可变的和不可序列化的。但是您需要一个参与者来接收一系列消息以提供给枚举器。通过 DSL 创建参与者,它明确地嵌入到调用上下文中,因此不存在枚举器跨越序列化边界泄漏的风险。