在 akka-streams 中没有正确处理背压

backpressure is not properly handled in akka-streams

我使用 akka-streams 编写了一个简单的流 api 假设它会处理我的源,但不幸的是它不会。我确信我在源代码中做错了什么。我只是创建了一个生成大量元素的迭代器,假设它无关紧要,因为 akka-streams api 会处理背压。我做错了什么,这是我的迭代器。

def createData(args: Array[String]): Iterator[TimeSeriesValue] = {
var data = new ListBuffer[TimeSeriesValue]()
for (i <- 1 to range) {
  sessionId = UUID.randomUUID()
  for (j <- 1 to countersPerSession) {
    time = DateTime.now()
    keyName = s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j"
    for (k <- 1 to snapShotCount) {
      time = time.plusSeconds(2)
      fValue = new Random().nextLong()
      data += TimeSeriesValue(sessionId, keyName, time, fValue)
      totalRows += 1
    }
  }
}
data.iterator

}

问题主要出在行

data += TimeSeriesValue(sessionId, keyName, time, fValue)

您正在用 "very large number of elements" 不断添加到 ListBuffer。这正在消耗你所有的内存。 data.iterator 行只是将巨大的 ListBuffer blob 包装在迭代器中,以一次提供每个元素,它基本上只是一个转换。

您关于 "it won't matter because ... of backpressure" 的假设部分正确,即 akka Stream 将反应性地处理 TimeSeriesValue 值,但您甚至在到达 Source 构造函数之前就创建了大量值。

如果你希望这个迭代器是"lazy",即只在需要时产生值而不消耗内存,那么进行以下修改(注意:我分解了代码以使其更具可读性):

def createTimeSeries(startTime: Time, snapShotCount : Int, sessionId : UUID, keyName : String) = 
  Iterator.range(1, snapShotCount)
          .map(_ * 2)
          .map(startTime plusSeconds _)
          .map(t => TimeSeriesValue(sessionId, keyName, t, ThreadLocalRandom.current().nextLong()))

def sessionGenerator(countersPerSession : Int, sessionID : UUID) = 
  Iterator.range(1, countersPerSession)
          .map(j => s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j")
          .flatMap { keyName => 
    createTimeSeries(DateTime.now(), snapShotCount, sessionID, keyName)
  }

object UUIDIterator extends Iterator[UUID] {
  def hasNext : Boolean = true
  def next() : UUID = UUID.randomUUID()
}

def iterateOverIDs(range : Int) = 
  UUIDIterator.take(range)              
              .flatMap(sessionID => sessionGenerator(countersPerSession, sessionID))

以上每一个函数returns一个迭代器。因此,调用 iterateOverIDs 应该是即时的,因为没有立即完成任何工作并且正在消耗微量内存。然后可以将此迭代器传递到您的 Stream 中...