Scala / Akka Streams 中的元素分组

Grouping of elements in Scala / Akka Streams

假设我有一个不同水果的来源,我想将它们的数量插入数据库。

我可以这样做:

Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

但这显然很慢——为什么要将每个项目都插入到数据库中,当我可以将它们分组时?所以我想出了一个更好的解决方案:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

但这意味着我必须在内存中保存 10 000 个元素 [banana, orange, orange, orange, banana, ...],直到它们被刷新到数据库中。这不是低效的吗?也许我可以这样做:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

根据我的理解,这也应该一次处理 10 000 个元素,但不应该占用那么多内存(假设元素经常重复)。但是每个键在内存中仍然重复了 100 次。当然我可以 .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... 但是没有更好的方法吗?也许是这样的:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

但这不是打破了 Akka 流的概念,即处理阶段的独立性(以及并发性)吗?

如果 Fruit 集的基数较低,那么您可以保留一个包含所有计数的单一 Map,然后在流过所有 Fruit 值后将其刷新到数据库。

首先,构造一个将保持 运行 计数的流:

type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount = 
  Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
  (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
  Flow[Fruit].scan(zeroCount)(appendFruitToCount)

现在创建一个接收最后一个 FruitCount 并实现流的接收器:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] = 
  fruitSource
    .via(fruitCountFlow)
    .to(lastFruitCountSink)
    .run()

然后可以使用 lastFruitCountFut 将值发送到数据库:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
  insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

使用 Iterator 是因为它是构造 TraversableOnce Fruit 项的内存效率最高的集合。

此解决方案将仅在内存中保留 1 Map,其中每个不同的水果类型有 1 个键,每个键有 1 个整数。