Scala 线程池 - 同时调用 API
Scala Thread Pool - Invoking API's Concurrently
我在数据块中有一个用例,其中必须对 URL 的数据集进行 API 调用。该数据集有大约 100K 条记录。
允许的最大并发数为 3。
我在 Scala 中实现了实现,在 databricks notebook 中实现了 运行。除了排队等待的一个元素外,我觉得这里缺少一些东西。
阻塞队列和线程池是解决这个问题的正确方法吗
在下面的代码中,我修改了代码,而不是从数据集中读取,而是在 Seq 上采样。
任何 help/thought 将不胜感激。
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit;
var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)
val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))
val pool = Executors.newFixedThreadPool(3)
var i = 0
inpDS.foreach{
ix => {
inpQueue.put(ix)
val t = new ConsumerAPIThread()
t.setName("MyThread-"+i+" ")
pool.execute(t)
}
i = i+1
}
println("Final Queue Size = " +inpQueue.size+"\n")
class ConsumerAPIThread() extends Thread
{
var name =""
override def run()
{
val urlDetail = inpQueue.take()
print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n")
triggerAPI((urlDetail._1, urlDetail._2))
}
def triggerAPI(params:(Int,String)){
try{
val result = scala.io.Source.fromURL(params._2)
println("" +result)
}catch{
case ex:Exception => {
println("Exception caught")
}
}
}
def ConsumerAPIThread(s:String)
{
name = s;
}
}
您需要在完成工作后关闭 Executor
否则它将等待。
尝试在程序末尾添加 pool.shutdown()
。
因此,您有两个要求:功能性要求是您希望异步处理列表中的项目,非功能性要求是您希望一次处理的项目不超过三个。
关于后者,好的是,正如您已经在问题中展示的那样,Java 原生公开了一个很好地打包的 Executor
,即 运行 线程上的任务具有固定大小的池,优雅地允许您在使用线程时限制并发级别。
转向功能需求,Scala 通过将某些功能精确地作为其标准的一部分来提供帮助 API。特别是它使用 scala.concurrent.Future
,因此为了使用它,我们必须根据 Future
重新构造 triggerAPI
。该函数的内容不是特别相关,所以我们现在主要关注它的(修改后的)签名:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
Future {
// some code that takes some time to run...
}
请注意现在 triggerAPI
return 是 Future
。 Future
可以被认为是最终要计算的东西的读句柄。特别是,这是一个 Future[Unit]
,其中 Unit
代表 “我们并不特别关心这个函数的输出,但主要关心它的副作用”.
此外,请注意该方法现在采用 隐式参数 ,即 ExecutionContext
。 ExecutionContext
用于为 Future
提供某种形式的计算发生环境。 Scala 有一个 API 从 java.util.concurrent.ExecutorService
创建一个 ExecutionContext
,所以这将派上用场 运行 我们在固定线程池上的计算,运行ning在任何给定时间不超过三个回调。
在继续之前,如果您对 Future
s、ExecutionContext
s 和 隐式参数 有疑问,Scala 文档是您最好的知识来源(这里有一些建议:1, 2)。
现在我们有了新的 triggerAPI
方法,我们可以使用 Future.traverse
(here is the documentation for Scala 2.12 -- 在撰写本文时最新版本是 2.13,但尽我所能知识 Spark 用户暂时停留在 2.12。
tl;dr of Future.traverse
是它采用某种形式的容器和一个函数,该函数接收该容器中的项目,returns a Future
其他东西。该函数将应用于容器中的每个项目,结果将是结果容器的 Future
。在你的例子中:容器是 List
,物品是 (Int, String)
,而你 return 的其他东西是 Unit
.
这意味着您可以像这样简单地调用它:
Future.traverse(inpDS)(triggerAPI)
并且 triggerAPI
将应用于 inpDS
中的每个项目。
通过确保线程池支持的执行上下文在调用 Future.traverse
时处于隐式范围内,项目将使用所需的线程池进行处理。
调用的结果是Future[List[Unit]]
,这不是很有趣,可以简单地丢弃(因为你只对副作用感兴趣)。
说了很多,如果您想使用我描述的代码,可以这样做 here on Scastie。
供参考,这是整个实现:
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.duration.DurationLong
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
val datasets = List(
(1, "https://google.com/2X6barD"),
(2, "https://google.com/3d9vCgW"),
(3, "https://google.com/2M02Xz0"),
(4, "https://google.com/2XOu2uL"),
(5, "https://google.com/2AfBWF0"),
(6, "https://google.com/36AEKsw"),
(7, "https://google.com/3enBxz7"),
(8, "https://google.com/36ABq0x"),
(9, "https://google.com/2XBjmiF")
)
val executor: ExecutorService = Executors.newFixedThreadPool(3)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)
def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
Future {
val (index, _) = params
println(s"+ started processing $index")
val start = System.nanoTime() / 1000000
Iterator.from(0).map(_ + 1).drop(100000000).take(1).toList.head // a noticeably slow operation
val end = System.nanoTime() / 1000000
val duration = (end - start).millis
println(s"- finished processing $index after $duration")
}
Future.traverse(datasets)(triggerAPI).onComplete {
case result =>
println("* processing is over, shutting down the executor")
executionContext.shutdown()
}
我在数据块中有一个用例,其中必须对 URL 的数据集进行 API 调用。该数据集有大约 100K 条记录。
允许的最大并发数为 3。
我在 Scala 中实现了实现,在 databricks notebook 中实现了 运行。除了排队等待的一个元素外,我觉得这里缺少一些东西。
阻塞队列和线程池是解决这个问题的正确方法吗
在下面的代码中,我修改了代码,而不是从数据集中读取,而是在 Seq 上采样。 任何 help/thought 将不胜感激。
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit;
var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)
val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))
val pool = Executors.newFixedThreadPool(3)
var i = 0
inpDS.foreach{
ix => {
inpQueue.put(ix)
val t = new ConsumerAPIThread()
t.setName("MyThread-"+i+" ")
pool.execute(t)
}
i = i+1
}
println("Final Queue Size = " +inpQueue.size+"\n")
class ConsumerAPIThread() extends Thread
{
var name =""
override def run()
{
val urlDetail = inpQueue.take()
print(this.getName()+" "+ Thread.currentThread().getName() + " popped "+urlDetail+" Queue Size "+inpQueue.size+" \n")
triggerAPI((urlDetail._1, urlDetail._2))
}
def triggerAPI(params:(Int,String)){
try{
val result = scala.io.Source.fromURL(params._2)
println("" +result)
}catch{
case ex:Exception => {
println("Exception caught")
}
}
}
def ConsumerAPIThread(s:String)
{
name = s;
}
}
您需要在完成工作后关闭 Executor
否则它将等待。
尝试在程序末尾添加 pool.shutdown()
。
因此,您有两个要求:功能性要求是您希望异步处理列表中的项目,非功能性要求是您希望一次处理的项目不超过三个。
关于后者,好的是,正如您已经在问题中展示的那样,Java 原生公开了一个很好地打包的 Executor
,即 运行 线程上的任务具有固定大小的池,优雅地允许您在使用线程时限制并发级别。
转向功能需求,Scala 通过将某些功能精确地作为其标准的一部分来提供帮助 API。特别是它使用 scala.concurrent.Future
,因此为了使用它,我们必须根据 Future
重新构造 triggerAPI
。该函数的内容不是特别相关,所以我们现在主要关注它的(修改后的)签名:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
Future {
// some code that takes some time to run...
}
请注意现在 triggerAPI
return 是 Future
。 Future
可以被认为是最终要计算的东西的读句柄。特别是,这是一个 Future[Unit]
,其中 Unit
代表 “我们并不特别关心这个函数的输出,但主要关心它的副作用”.
此外,请注意该方法现在采用 隐式参数 ,即 ExecutionContext
。 ExecutionContext
用于为 Future
提供某种形式的计算发生环境。 Scala 有一个 API 从 java.util.concurrent.ExecutorService
创建一个 ExecutionContext
,所以这将派上用场 运行 我们在固定线程池上的计算,运行ning在任何给定时间不超过三个回调。
在继续之前,如果您对 Future
s、ExecutionContext
s 和 隐式参数 有疑问,Scala 文档是您最好的知识来源(这里有一些建议:1, 2)。
现在我们有了新的 triggerAPI
方法,我们可以使用 Future.traverse
(here is the documentation for Scala 2.12 -- 在撰写本文时最新版本是 2.13,但尽我所能知识 Spark 用户暂时停留在 2.12。
tl;dr of Future.traverse
是它采用某种形式的容器和一个函数,该函数接收该容器中的项目,returns a Future
其他东西。该函数将应用于容器中的每个项目,结果将是结果容器的 Future
。在你的例子中:容器是 List
,物品是 (Int, String)
,而你 return 的其他东西是 Unit
.
这意味着您可以像这样简单地调用它:
Future.traverse(inpDS)(triggerAPI)
并且 triggerAPI
将应用于 inpDS
中的每个项目。
通过确保线程池支持的执行上下文在调用 Future.traverse
时处于隐式范围内,项目将使用所需的线程池进行处理。
调用的结果是Future[List[Unit]]
,这不是很有趣,可以简单地丢弃(因为你只对副作用感兴趣)。
说了很多,如果您想使用我描述的代码,可以这样做 here on Scastie。
供参考,这是整个实现:
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.duration.DurationLong
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
val datasets = List(
(1, "https://google.com/2X6barD"),
(2, "https://google.com/3d9vCgW"),
(3, "https://google.com/2M02Xz0"),
(4, "https://google.com/2XOu2uL"),
(5, "https://google.com/2AfBWF0"),
(6, "https://google.com/36AEKsw"),
(7, "https://google.com/3enBxz7"),
(8, "https://google.com/36ABq0x"),
(9, "https://google.com/2XBjmiF")
)
val executor: ExecutorService = Executors.newFixedThreadPool(3)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)
def triggerAPI(params: (Int, String))(implicit ec: ExecutionContext): Future[Unit] =
Future {
val (index, _) = params
println(s"+ started processing $index")
val start = System.nanoTime() / 1000000
Iterator.from(0).map(_ + 1).drop(100000000).take(1).toList.head // a noticeably slow operation
val end = System.nanoTime() / 1000000
val duration = (end - start).millis
println(s"- finished processing $index after $duration")
}
Future.traverse(datasets)(triggerAPI).onComplete {
case result =>
println("* processing is over, shutting down the executor")
executionContext.shutdown()
}