如何在 Google Dataproc 集群上 运行 Java 并行算法?
How to run a Java parallel algorithm on Google Dataproc cluster?
我有一个使用 Spark 实现的简单 java 并行算法。但我不确定如何在 Google Dataproc 集群上 运行 它。我在网上找到了很多使用 python 或 scala 的资源,但对于 java 来说还不够。这是代码
public class Prime {
List<Integer> primes = new ArrayList<>();
//Method to calculate and count the prime numbers
public void countPrime(int n){
for (int i = 2; i < n; i++){
boolean isPrime = true;
//check if the number is prime or not
for (int j = 2; j < i; j++){
if (i % j == 0){
isPrime = false;
break; // exit the inner for loop
}
}
//add the primes into the List
if (isPrime){
primes.add(i);
}
}
}
//Main method to run the program
public static void main(String[]args){
//creating javaSparkContext object
SparkConf conf = new SparkConf().setAppName("haha").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//new prime object
Prime prime = new Prime();
prime.countPrime(100000);
//parallelize the collection
JavaRDD<Integer> rdd = sc.parallelize(prime.primes , 4);
long count = rdd.filter(e -> e == 2|| e % 2 != 0).count();
}
}
如果您的 jarfile 已经将“Prime”指定为 main-class,那么在基本级别上它就像:
gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jar prime-jarfile.jar
如果您有一个未指定 main-class 的 jarfile,您可以将 jarfile 作为“--jars”(末尾带有 's')提交并指定“--class”改为:
gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jars prime-jarfile.jar --class Prime
但是请注意,由于您指定 setMaster("local")
,这会覆盖集群自身的 spark 环境设置,并且它只会 运行 使用主节点上的线程。您只需要完全删除 .setMaster("local")
,它会自动在 Dataproc 集群中选择 YARN 配置,以便在多个工作节点上实际 运行。
此外,我意识到这只是一个入门练习,所以它可能并不重要,但您几乎肯定不会在真正的分布式模式中看到任何“加速”,因为:
- 与加载整数所花费的时间相比,使用 Spark 的计算太“便宜”了。
- 与启动远程执行的开销相比,正在处理的元素数量太少
- 分区的数量 (4) 可能太小而无法启动动态执行程序分配,因此它们可能最终 运行 一个接一个
所以您可能会看到更多“有趣”的结果,例如,如果您并行化的数字每个都代表较大的“范围”供工作人员检查;例如,如果数字“0”表示“计算 0 到 1,000,000 之间的素数”,“1”表示“计算 1,000,000 到 2,000,000 之间的素数”,等等。那么您可能会这样:
// Start with rdd is just parallelize the numbers 0 through 999 inclusive with something like 100 to 1000 "slices".
JavaRDD<Integer> countsPerRange = rdd.map(e -> countPrimesInRange(e*1000000, (e+1)*1000000));
int totalCount = countsPerRange.reduce((a, b) -> a + b);
我有一个使用 Spark 实现的简单 java 并行算法。但我不确定如何在 Google Dataproc 集群上 运行 它。我在网上找到了很多使用 python 或 scala 的资源,但对于 java 来说还不够。这是代码
public class Prime {
List<Integer> primes = new ArrayList<>();
//Method to calculate and count the prime numbers
public void countPrime(int n){
for (int i = 2; i < n; i++){
boolean isPrime = true;
//check if the number is prime or not
for (int j = 2; j < i; j++){
if (i % j == 0){
isPrime = false;
break; // exit the inner for loop
}
}
//add the primes into the List
if (isPrime){
primes.add(i);
}
}
}
//Main method to run the program
public static void main(String[]args){
//creating javaSparkContext object
SparkConf conf = new SparkConf().setAppName("haha").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//new prime object
Prime prime = new Prime();
prime.countPrime(100000);
//parallelize the collection
JavaRDD<Integer> rdd = sc.parallelize(prime.primes , 4);
long count = rdd.filter(e -> e == 2|| e % 2 != 0).count();
}
}
如果您的 jarfile 已经将“Prime”指定为 main-class,那么在基本级别上它就像:
gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jar prime-jarfile.jar
如果您有一个未指定 main-class 的 jarfile,您可以将 jarfile 作为“--jars”(末尾带有 's')提交并指定“--class”改为:
gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jars prime-jarfile.jar --class Prime
但是请注意,由于您指定 setMaster("local")
,这会覆盖集群自身的 spark 环境设置,并且它只会 运行 使用主节点上的线程。您只需要完全删除 .setMaster("local")
,它会自动在 Dataproc 集群中选择 YARN 配置,以便在多个工作节点上实际 运行。
此外,我意识到这只是一个入门练习,所以它可能并不重要,但您几乎肯定不会在真正的分布式模式中看到任何“加速”,因为:
- 与加载整数所花费的时间相比,使用 Spark 的计算太“便宜”了。
- 与启动远程执行的开销相比,正在处理的元素数量太少
- 分区的数量 (4) 可能太小而无法启动动态执行程序分配,因此它们可能最终 运行 一个接一个
所以您可能会看到更多“有趣”的结果,例如,如果您并行化的数字每个都代表较大的“范围”供工作人员检查;例如,如果数字“0”表示“计算 0 到 1,000,000 之间的素数”,“1”表示“计算 1,000,000 到 2,000,000 之间的素数”,等等。那么您可能会这样:
// Start with rdd is just parallelize the numbers 0 through 999 inclusive with something like 100 to 1000 "slices".
JavaRDD<Integer> countsPerRange = rdd.map(e -> countPrimesInRange(e*1000000, (e+1)*1000000));
int totalCount = countsPerRange.reduce((a, b) -> a + b);