替代 Scala 中的线程执行器池
Substitute for thread executor pool in Scala
我的应用程序要求我有多个线程 运行 从不同的 HDFS 节点获取数据。为此,我正在使用线程执行器池和分叉线程。
分叉于:
val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
pathSuffixList.foreach(block => {
ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
})
我的class消费:
class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {
override def run(): Unit = {
val uriSplit = webHdfsUri.split("\?")
val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
val inputStream = new URL(fileOpenUri).openStream()
val datumReader = new GenericDatumReader[Void]()
val dataStreamReader = new DataFileStream(inputStream, datumReader)
// val schema = dataStreamReader.getSchema()
val dataIterator = dataStreamReader.iterator()
while (dataIterator.hasNext) {
println(" data : " + dataStreamReader.next())
}
}
}
消费执行器:
object ConsumptionExecutor{
val counter: AtomicLong = new AtomicLong()
val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
def newThread(r: Runnable): Thread = {
val thread: Thread = new Thread(r)
thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
thread
}
})
executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)
def execute(trigger: Runnable) {
executionContext.execute(trigger)
}
}
但是我想使用 Akka streaming/Akka actor,我不需要提供固定的线程池大小,Akka 会处理所有事情。
我对 Akka 以及 Streaming 和 actors 的概念还很陌生。有人可以以示例代码的形式给我任何线索以适合我的用例吗?
提前致谢!
一个想法是创建 ActorPublisher for each HDFS node that you are reading from, and then Merge
them in as multiple Source
s in a FlowGraph.
的(子类)实例
类似于此伪代码,其中省略了 ActorPublisher
来源的详细信息:
val g = PartialFlowGraph { implicit b =>
import FlowGraphImplicits._
val in1 = actorSource1
val in2 = actorSource2
// etc.
val out = UndefinedSink[T]
val merge = Merge[T]
in1 ~> merge ~> out
in2 ~> merge
// etc.
}
对于一组演员源,这可以通过迭代它们并为每个演员源添加一条边到 merge
来改进,但这给出了想法。
我的应用程序要求我有多个线程 运行 从不同的 HDFS 节点获取数据。为此,我正在使用线程执行器池和分叉线程。 分叉于:
val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
pathSuffixList.foreach(block => {
ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
})
我的class消费:
class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {
override def run(): Unit = {
val uriSplit = webHdfsUri.split("\?")
val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
val inputStream = new URL(fileOpenUri).openStream()
val datumReader = new GenericDatumReader[Void]()
val dataStreamReader = new DataFileStream(inputStream, datumReader)
// val schema = dataStreamReader.getSchema()
val dataIterator = dataStreamReader.iterator()
while (dataIterator.hasNext) {
println(" data : " + dataStreamReader.next())
}
}
}
消费执行器:
object ConsumptionExecutor{
val counter: AtomicLong = new AtomicLong()
val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
def newThread(r: Runnable): Thread = {
val thread: Thread = new Thread(r)
thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
thread
}
})
executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)
def execute(trigger: Runnable) {
executionContext.execute(trigger)
}
}
但是我想使用 Akka streaming/Akka actor,我不需要提供固定的线程池大小,Akka 会处理所有事情。 我对 Akka 以及 Streaming 和 actors 的概念还很陌生。有人可以以示例代码的形式给我任何线索以适合我的用例吗? 提前致谢!
一个想法是创建 ActorPublisher for each HDFS node that you are reading from, and then Merge
them in as multiple Source
s in a FlowGraph.
类似于此伪代码,其中省略了 ActorPublisher
来源的详细信息:
val g = PartialFlowGraph { implicit b =>
import FlowGraphImplicits._
val in1 = actorSource1
val in2 = actorSource2
// etc.
val out = UndefinedSink[T]
val merge = Merge[T]
in1 ~> merge ~> out
in2 ~> merge
// etc.
}
对于一组演员源,这可以通过迭代它们并为每个演员源添加一条边到 merge
来改进,但这给出了想法。