Play 框架、Akka Persistence、PersistenceQuery,无法将 Source 转换为 Future Object for Action.async
Play framework, Akka Persistence, PersistenceQuery, unable to convert Source to Future Object for Action.async
我正在尝试获取持续事件列表并将其作为响应发送 (Action.async
)。但是我无法将 PersistenceQuery
结果转换为 Future
对象。这是代码
val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
e.event match {
case l: String =>
Future(Json.parse(l))
}
}
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
println(a)
a :+ b
})
finalResult
我能够在 runFold
内看到打印件,但 finalResult
从未归还。我什至尝试 Await
,即使等了几分钟,它也没有返回。此 finalResult
显示所有用户 activity 想要将其作为 Action.async
的响应发送。
请让我知道将 Source
转换为 Future
对象
的方法是什么
修复此问题首选使用currentEventsByPersistenceId
,此returns所有当前事件。 eventsByPersistenceId
和 take
终止流。但在我的例子中,take
函数需要收集 10 个事件,在我的例子中,我的事件少于 10 个,所以流在它累积至少 10 个事件之前不会终止。所以结果永远不会返回。
我正在尝试获取持续事件列表并将其作为响应发送 (Action.async
)。但是我无法将 PersistenceQuery
结果转换为 Future
对象。这是代码
val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
e.event match {
case l: String =>
Future(Json.parse(l))
}
}
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
println(a)
a :+ b
})
finalResult
我能够在 runFold
内看到打印件,但 finalResult
从未归还。我什至尝试 Await
,即使等了几分钟,它也没有返回。此 finalResult
显示所有用户 activity 想要将其作为 Action.async
的响应发送。
请让我知道将 Source
转换为 Future
对象
修复此问题首选使用currentEventsByPersistenceId
,此returns所有当前事件。 eventsByPersistenceId
和 take
终止流。但在我的例子中,take
函数需要收集 10 个事件,在我的例子中,我的事件少于 10 个,所以流在它累积至少 10 个事件之前不会终止。所以结果永远不会返回。