在 akka-streams 中没有正确处理背压
backpressure is not properly handled in akka-streams
我使用 akka-streams 编写了一个简单的流 api 假设它会处理我的源,但不幸的是它不会。我确信我在源代码中做错了什么。我只是创建了一个生成大量元素的迭代器,假设它无关紧要,因为 akka-streams api 会处理背压。我做错了什么,这是我的迭代器。
def createData(args: Array[String]): Iterator[TimeSeriesValue] = {
var data = new ListBuffer[TimeSeriesValue]()
for (i <- 1 to range) {
sessionId = UUID.randomUUID()
for (j <- 1 to countersPerSession) {
time = DateTime.now()
keyName = s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j"
for (k <- 1 to snapShotCount) {
time = time.plusSeconds(2)
fValue = new Random().nextLong()
data += TimeSeriesValue(sessionId, keyName, time, fValue)
totalRows += 1
}
}
}
data.iterator
}
问题主要出在行
data += TimeSeriesValue(sessionId, keyName, time, fValue)
您正在用 "very large number of elements" 不断添加到 ListBuffer
。这正在消耗你所有的内存。 data.iterator
行只是将巨大的 ListBuffer blob 包装在迭代器中,以一次提供每个元素,它基本上只是一个转换。
您关于 "it won't matter because ... of backpressure" 的假设部分正确,即 akka Stream 将反应性地处理 TimeSeriesValue
值,但您甚至在到达 Source 构造函数之前就创建了大量值。
如果你希望这个迭代器是"lazy",即只在需要时产生值而不消耗内存,那么进行以下修改(注意:我分解了代码以使其更具可读性):
def createTimeSeries(startTime: Time, snapShotCount : Int, sessionId : UUID, keyName : String) =
Iterator.range(1, snapShotCount)
.map(_ * 2)
.map(startTime plusSeconds _)
.map(t => TimeSeriesValue(sessionId, keyName, t, ThreadLocalRandom.current().nextLong()))
def sessionGenerator(countersPerSession : Int, sessionID : UUID) =
Iterator.range(1, countersPerSession)
.map(j => s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j")
.flatMap { keyName =>
createTimeSeries(DateTime.now(), snapShotCount, sessionID, keyName)
}
object UUIDIterator extends Iterator[UUID] {
def hasNext : Boolean = true
def next() : UUID = UUID.randomUUID()
}
def iterateOverIDs(range : Int) =
UUIDIterator.take(range)
.flatMap(sessionID => sessionGenerator(countersPerSession, sessionID))
以上每一个函数returns一个迭代器。因此,调用 iterateOverIDs
应该是即时的,因为没有立即完成任何工作并且正在消耗微量内存。然后可以将此迭代器传递到您的 Stream 中...
我使用 akka-streams 编写了一个简单的流 api 假设它会处理我的源,但不幸的是它不会。我确信我在源代码中做错了什么。我只是创建了一个生成大量元素的迭代器,假设它无关紧要,因为 akka-streams api 会处理背压。我做错了什么,这是我的迭代器。
def createData(args: Array[String]): Iterator[TimeSeriesValue] = {
var data = new ListBuffer[TimeSeriesValue]()
for (i <- 1 to range) {
sessionId = UUID.randomUUID()
for (j <- 1 to countersPerSession) {
time = DateTime.now()
keyName = s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j"
for (k <- 1 to snapShotCount) {
time = time.plusSeconds(2)
fValue = new Random().nextLong()
data += TimeSeriesValue(sessionId, keyName, time, fValue)
totalRows += 1
}
}
}
data.iterator
}
问题主要出在行
data += TimeSeriesValue(sessionId, keyName, time, fValue)
您正在用 "very large number of elements" 不断添加到 ListBuffer
。这正在消耗你所有的内存。 data.iterator
行只是将巨大的 ListBuffer blob 包装在迭代器中,以一次提供每个元素,它基本上只是一个转换。
您关于 "it won't matter because ... of backpressure" 的假设部分正确,即 akka Stream 将反应性地处理 TimeSeriesValue
值,但您甚至在到达 Source 构造函数之前就创建了大量值。
如果你希望这个迭代器是"lazy",即只在需要时产生值而不消耗内存,那么进行以下修改(注意:我分解了代码以使其更具可读性):
def createTimeSeries(startTime: Time, snapShotCount : Int, sessionId : UUID, keyName : String) =
Iterator.range(1, snapShotCount)
.map(_ * 2)
.map(startTime plusSeconds _)
.map(t => TimeSeriesValue(sessionId, keyName, t, ThreadLocalRandom.current().nextLong()))
def sessionGenerator(countersPerSession : Int, sessionID : UUID) =
Iterator.range(1, countersPerSession)
.map(j => s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j")
.flatMap { keyName =>
createTimeSeries(DateTime.now(), snapShotCount, sessionID, keyName)
}
object UUIDIterator extends Iterator[UUID] {
def hasNext : Boolean = true
def next() : UUID = UUID.randomUUID()
}
def iterateOverIDs(range : Int) =
UUIDIterator.take(range)
.flatMap(sessionID => sessionGenerator(countersPerSession, sessionID))
以上每一个函数returns一个迭代器。因此,调用 iterateOverIDs
应该是即时的,因为没有立即完成任何工作并且正在消耗微量内存。然后可以将此迭代器传递到您的 Stream 中...