如何将来自多个来源的结果组合成流

How to combine results into stream from multiple sources

我用的是slick-3.0.0,尝试用streaming

假设有AccountsTablePreferencesTable:

我想从 PreferencesTable 获取一些信息并将其用于来自 AccountsTable 的流中。例如(参见 TODO):

val somePrefQuery: Query[Rep[String], ...] = PreferencesTable.filter(...)
val somePrefAction = somePrefQuery.result
val somePrefStream = db.stream(somePrefAction)

val accountsStream: DatabasePublisher[String] = 
                              db.stream(AccountsTable.map(_.id).result)

accountsStream.mapResult { accountId: String =>
   //TODO how to get somePref value from 
   //     somePrefQuery or somePrefAction or somePrefStream
   // Is there best approach for such task?
   val somePref: String = ???

   val result:(String, String) = (accountId, somePref)
   result
}

Akka 提供 concurrent streaming functionality 允许您以您描述的方式将流组合在一起。

您首先创建 accountId 值的来源:

import akka.stream.scaladsl.Source

val accountIdSrc : Source[String, _] = 
  Source fromPublisher (db stream (AccountsTable.map(_.id).result))

然后可以将此 Source 附加到 pref 查询逻辑:

def queryForPrefs(accountId : String) =
  PreferenceTable
    .filter(_.accountId === accountId)
    .map(_.pref)
    .result

case class PrefData(accountId : String, somePref : String)

val accountAndPrefSrc : Source[PrefData, _] = 
  accountIdSrc flatMapConcat { accountId =>
    Source
      .fromPublisher(db stream queryForPrefs(accountId))
      .map(pref => PrefData(accountId, pref))
  }