如何在 Google Dataproc 集群上 运行 Java 并行算法?

How to run a Java parallel algorithm on Google Dataproc cluster?

我有一个使用 Spark 实现的简单 java 并行算法。但我不确定如何在 Google Dataproc 集群上 运行 它。我在网上找到了很多使用 python 或 scala 的资源,但对于 java 来说还不够。这是代码

public class Prime {

    List<Integer> primes = new ArrayList<>();

    //Method to calculate and count the prime numbers
    public void countPrime(int n){
        for (int i = 2; i < n; i++){
            boolean  isPrime = true;

            //check if the number is prime or not
            for (int j = 2; j < i; j++){
                if (i % j == 0){
                    isPrime = false;
                    break;  // exit the inner for loop
                }
            }

            //add the primes into the List
            if (isPrime){
                primes.add(i);
            }
        }
    }

    //Main method to run the program
    public static void main(String[]args){
        //creating javaSparkContext object
        SparkConf conf = new SparkConf().setAppName("haha").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //new prime object
        Prime prime = new Prime();
        prime.countPrime(100000);

        //parallelize the collection
        JavaRDD<Integer> rdd = sc.parallelize(prime.primes , 4);
        long count = rdd.filter(e  -> e == 2|| e % 2 != 0).count(); 
    }
}

如果您的 jarfile 已经将“Prime”指定为 main-class,那么在基本级别上它就像:

gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jar prime-jarfile.jar

如果您有一个未指定 main-class 的 jarfile,您可以将 jarfile 作为“--jars”(末尾带有 's')提交并指定“--class”改为:

gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} --jars prime-jarfile.jar --class Prime

但是请注意,由于您指定 setMaster("local"),这会覆盖集群自身的 spark 环境设置,并且它只会 运行 使用主节点上的线程。您只需要完全删除 .setMaster("local"),它会自动在 Dataproc 集群中选择 YARN 配置,以便在多个工作节点上实际 运行。

此外,我意识到这只是一个入门练习,所以它可能并不重要,但您几乎肯定不会在真正的分布式模式中看到任何“加速”,因为:

  1. 与加载整数所花费的时间相比,使用 Spark 的计算太“便宜”了。
  2. 与启动远程执行的开销相比,正在处理的元素数量太少
  3. 分区的数量 (4) 可能太小而无法启动动态执行程序分配,因此它们可能最终 运行 一个接一个

所以您可能会看到更多“有趣”的结果,例如,如果您并行化的数字每个都代表较大的“范围”供工作人员检查;例如,如果数字“0”表示“计算 0 到 1,000,000 之间的素数”,“1”表示“计算 1,000,000 到 2,000,000 之间的素数”,等等。那么您可能会这样:

// Start with rdd is just parallelize the numbers 0 through 999 inclusive with something like 100 to 1000 "slices".
JavaRDD<Integer> countsPerRange = rdd.map(e -> countPrimesInRange(e*1000000, (e+1)*1000000));
int totalCount = countsPerRange.reduce((a, b) -> a + b);