阿卡溪流。控制一次在 Akka 流中处理的项目数
Akka Streams. Control The Number of Items Being Processed in Akka Streams At One Time
Akka 流显着减少了我的样板代码并包含许多有用的功能。但是,我需要能够限制处理项目的速度。问题是我正在提供一个附加到资源源链接的 Hazelcast 队列,以便随着时间的推移(从单个在线站点)下载,但进入队列的链接数量可能会变得非常大。理想情况下,一次不超过 50-60 个请求 运行。 Akka Streams 中是否有允许我限制一次处理的项目数量的功能?
另一个限制是在与某些网站交互时需要复杂的状态管理、代码处理和其他功能。 Akka Http 对此无能为力。我的网络代码完全用 Jsoup 和 Apache Http 组件编写,偶尔调用基于 JavaFX 的服务器来呈现脚本。
我目前尝试使用文档中描述的缓冲区来控制输入速率如下:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
您正在寻找的机制是 mapAsync
(或 mapAsyncUnordered
,如果不需要保留顺序 - 就像您的示例中那样)。
这些组合器采用 parallelism
参数,目的是限制阶段可以 运行.
的并行任务数
它应该成为您 DownloadFlow
的一部分。
假设你的 DownloadFlow
运行s 异步代码,你可以这样构造它:
def download(input: Input): Future[Output] = ???
val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(downloadFlow)(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
由于您的下载流程具有有意义的具体化价值,它可能会稍微复杂一些,但希望您能理解。
有关详细信息,请参阅 docs。
Akka 流显着减少了我的样板代码并包含许多有用的功能。但是,我需要能够限制处理项目的速度。问题是我正在提供一个附加到资源源链接的 Hazelcast 队列,以便随着时间的推移(从单个在线站点)下载,但进入队列的链接数量可能会变得非常大。理想情况下,一次不超过 50-60 个请求 运行。 Akka Streams 中是否有允许我限制一次处理的项目数量的功能?
另一个限制是在与某些网站交互时需要复杂的状态管理、代码处理和其他功能。 Akka Http 对此无能为力。我的网络代码完全用 Jsoup 和 Apache Http 组件编写,偶尔调用基于 JavaFX 的服务器来呈现脚本。
我目前尝试使用文档中描述的缓冲区来控制输入速率如下:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
您正在寻找的机制是 mapAsync
(或 mapAsyncUnordered
,如果不需要保留顺序 - 就像您的示例中那样)。
这些组合器采用 parallelism
参数,目的是限制阶段可以 运行.
它应该成为您 DownloadFlow
的一部分。
假设你的 DownloadFlow
运行s 异步代码,你可以这样构造它:
def download(input: Input): Future[Output] = ???
val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(downloadFlow)(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
由于您的下载流程具有有意义的具体化价值,它可能会稍微复杂一些,但希望您能理解。
有关详细信息,请参阅 docs。