如何将来自多个来源的结果组合成流
How to combine results into stream from multiple sources
我用的是slick-3.0.0,尝试用streaming。
假设有AccountsTable
和PreferencesTable
:
我想从 PreferencesTable
获取一些信息并将其用于来自 AccountsTable
的流中。例如(参见 TODO):
val somePrefQuery: Query[Rep[String], ...] = PreferencesTable.filter(...)
val somePrefAction = somePrefQuery.result
val somePrefStream = db.stream(somePrefAction)
val accountsStream: DatabasePublisher[String] =
db.stream(AccountsTable.map(_.id).result)
accountsStream.mapResult { accountId: String =>
//TODO how to get somePref value from
// somePrefQuery or somePrefAction or somePrefStream
// Is there best approach for such task?
val somePref: String = ???
val result:(String, String) = (accountId, somePref)
result
}
Akka 提供 concurrent streaming functionality 允许您以您描述的方式将流组合在一起。
您首先创建 accountId
值的来源:
import akka.stream.scaladsl.Source
val accountIdSrc : Source[String, _] =
Source fromPublisher (db stream (AccountsTable.map(_.id).result))
然后可以将此 Source
附加到 pref 查询逻辑:
def queryForPrefs(accountId : String) =
PreferenceTable
.filter(_.accountId === accountId)
.map(_.pref)
.result
case class PrefData(accountId : String, somePref : String)
val accountAndPrefSrc : Source[PrefData, _] =
accountIdSrc flatMapConcat { accountId =>
Source
.fromPublisher(db stream queryForPrefs(accountId))
.map(pref => PrefData(accountId, pref))
}
我用的是slick-3.0.0,尝试用streaming。
假设有AccountsTable
和PreferencesTable
:
我想从 PreferencesTable
获取一些信息并将其用于来自 AccountsTable
的流中。例如(参见 TODO):
val somePrefQuery: Query[Rep[String], ...] = PreferencesTable.filter(...)
val somePrefAction = somePrefQuery.result
val somePrefStream = db.stream(somePrefAction)
val accountsStream: DatabasePublisher[String] =
db.stream(AccountsTable.map(_.id).result)
accountsStream.mapResult { accountId: String =>
//TODO how to get somePref value from
// somePrefQuery or somePrefAction or somePrefStream
// Is there best approach for such task?
val somePref: String = ???
val result:(String, String) = (accountId, somePref)
result
}
Akka 提供 concurrent streaming functionality 允许您以您描述的方式将流组合在一起。
您首先创建 accountId
值的来源:
import akka.stream.scaladsl.Source
val accountIdSrc : Source[String, _] =
Source fromPublisher (db stream (AccountsTable.map(_.id).result))
然后可以将此 Source
附加到 pref 查询逻辑:
def queryForPrefs(accountId : String) =
PreferenceTable
.filter(_.accountId === accountId)
.map(_.pref)
.result
case class PrefData(accountId : String, somePref : String)
val accountAndPrefSrc : Source[PrefData, _] =
accountIdSrc flatMapConcat { accountId =>
Source
.fromPublisher(db stream queryForPrefs(accountId))
.map(pref => PrefData(accountId, pref))
}