从 Akka 流中的 Future 创建背压
Create backpressure from a Future inside an Akka stream
我是 Akka 流和一般流的新手,所以我可能在概念层面上完全误解了一些东西,但是有什么方法可以在未来解决之前产生背压吗?基本上我想做的是这样的:
object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)
def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
字节从文件中读取并流式传输到解析器,解析器发出 Seq
s of ExampleObject
s,然后将这些数据流式传输到异步操作 returns a Future
。我想做到这一点,以便在 Future
解决之前,流的其余部分得到背压,然后在 Future 解决后恢复,将另一个 Seq[ExampleObject]
传递给 doAsyncOp
,从而恢复背压等等。
现在我正在使用:
Await.result(doAsyncOp(values), 10 seconds)
但我的理解是,这会锁定整个线程并且很糟糕。有没有更好的解决办法?
如果有帮助,大局是我正在尝试使用 Jawn 逐块解析一个非常大的 JSON 文件(太大而无法放入内存),然后将对象传递给 ElasticSearch在解析时被索引 - ElasticSearch 有一个包含 50 个待处理操作的队列,如果溢出,它会开始拒绝新对象。
这很容易。你需要使用 mapAync
:)
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.mapAsync(4)(values => doAsyncOp(values))
.runWith(Sink.seq)
其中 4
是并行级别。
我是 Akka 流和一般流的新手,所以我可能在概念层面上完全误解了一些东西,但是有什么方法可以在未来解决之前产生背压吗?基本上我想做的是这样的:
object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)
def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
字节从文件中读取并流式传输到解析器,解析器发出 Seq
s of ExampleObject
s,然后将这些数据流式传输到异步操作 returns a Future
。我想做到这一点,以便在 Future
解决之前,流的其余部分得到背压,然后在 Future 解决后恢复,将另一个 Seq[ExampleObject]
传递给 doAsyncOp
,从而恢复背压等等。
现在我正在使用:
Await.result(doAsyncOp(values), 10 seconds)
但我的理解是,这会锁定整个线程并且很糟糕。有没有更好的解决办法?
如果有帮助,大局是我正在尝试使用 Jawn 逐块解析一个非常大的 JSON 文件(太大而无法放入内存),然后将对象传递给 ElasticSearch在解析时被索引 - ElasticSearch 有一个包含 50 个待处理操作的队列,如果溢出,它会开始拒绝新对象。
这很容易。你需要使用 mapAync
:)
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.mapAsync(4)(values => doAsyncOp(values))
.runWith(Sink.seq)
其中 4
是并行级别。