如何使用单个 spark 上下文在 Apache Spark 中 运行 并发作业(操作)
How to run concurrent jobs(actions) in Apache Spark using single spark context
它在 Apache Spark 文档中说“在每个 Spark 应用程序中,如果多个“作业”(Spark 操作)是由不同线程提交的,则它们可能 运行 同时 ”。有人可以解释如何为以下示例代码实现这种并发吗?
SparkConf conf = new SparkConf().setAppName("Simple_App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
System.out.println(file1.count());
System.out.println(file2.count());
这两项工作是独立的,必须运行同时进行。
谢谢。
尝试这样的事情:
final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Start thread 1
Future<Long> future1 = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
return file1.count();
}
});
// Start thread 2
Future<Long> future2 = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
return file2.count();
}
});
// Wait thread 1
System.out.println("File1:"+future1.get());
// Wait thread 2
System.out.println("File2:"+future2.get());
使用 Scala 并行集合功能
Range(0,10).par.foreach {
project_id =>
{
spark.table("store_sales").selectExpr(project_id+" as project_id", "count(*) as cnt")
.write
.saveAsTable(s"counts_$project_id")
}
}
PS。以上将启动多达 10 个并行的 Spark 作业,但它可能会更少,具体取决于 Spark Driver 上的可用内核数量。 GQ使用Futures的上述方法在这方面更灵活。
它在 Apache Spark 文档中说“在每个 Spark 应用程序中,如果多个“作业”(Spark 操作)是由不同线程提交的,则它们可能 运行 同时 ”。有人可以解释如何为以下示例代码实现这种并发吗?
SparkConf conf = new SparkConf().setAppName("Simple_App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
System.out.println(file1.count());
System.out.println(file2.count());
这两项工作是独立的,必须运行同时进行。
谢谢。
尝试这样的事情:
final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Start thread 1
Future<Long> future1 = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
return file1.count();
}
});
// Start thread 2
Future<Long> future2 = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
return file2.count();
}
});
// Wait thread 1
System.out.println("File1:"+future1.get());
// Wait thread 2
System.out.println("File2:"+future2.get());
使用 Scala 并行集合功能
Range(0,10).par.foreach {
project_id =>
{
spark.table("store_sales").selectExpr(project_id+" as project_id", "count(*) as cnt")
.write
.saveAsTable(s"counts_$project_id")
}
}
PS。以上将启动多达 10 个并行的 Spark 作业,但它可能会更少,具体取决于 Spark Driver 上的可用内核数量。 GQ使用Futures的上述方法在这方面更灵活。