Apache Spark 性能问题

Apache Spark Performance Issue

我们考虑过使用 Apache Spark 来更快地匹配记录,但我们发现它比使用 select 语句匹配 SQL 效率低得多。
使用,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br> Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);

我们能够将大约 180 万条记录导入到存储在数据集对象中的 spark 环境中。 现在使用过滤功能 targetFileContent.filter(col("TARGETUPC").equalTo(upcValue))

上面的过滤器语句处于一个循环中,其中 upcValue 会针对大约 46k 个 ID 进行更新。

这个程序执行了几个小时,但我们使用 sql IN 运算符尝试了同样的操作,其中我们保留了不到一分钟执行的所有 46k UPC ID。

配置:
火花-sql 2.11
星火核心 2.11
JDK8
Windows10,单节点4核3Ghz,16GB内存。
C 盘 -> 12 GB 可用空间 space.
Eclipse -> 运行 配置 -> –Xms15000m.

如有错误请帮助我们分析理解,并建议我们需要做些什么来提高性能。

    @Component("upcExactMatch")
    public class UPCExactMatch {
        @Autowired
        private Environment envirnoment;

        @Autowired
        private LoadCSV loadCSV;

        @Autowired
        private SQLHandler sqlHandler;

        public ArrayList<Row> perform(){

            ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>();
            ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>();

            JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
            SQLContext sqlContext = new SQLContext(javaSparkContext);
            SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate();

            try{
                Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext);

                // load target from database
                Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
                System.out.println("File counts :"+sourceFileContent.count()+"  :  "+targetFileContent.count());

                ArrayList<String> upcMatched = new ArrayList<String>();
                ArrayList<String> partNumberMatched = new ArrayList<String>();

                List<Row> sourceFileContents = sourceFileContent.collectAsList();

                int upcColumnIndex=-1;
                int itemIDColumnIndex=-1;
                int partNumberTargetIndex=-1;
                String upcValue="";

                StructType schema = targetFileContent.schema();
                List<Row> data = Arrays.asList();
                Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema);

                for(Row rowSourceFileContent: sourceFileContents){

                    upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC");

                    if(!rowSourceFileContent.isNullAt(upcColumnIndex)){

                        upcValue=rowSourceFileContent.get(upcColumnIndex).toString();
                        upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue));

                        if(upcMatchedRows.count() > 0){

                            for(Row upcMatchedRow: upcMatchedRows.collectAsList()){
                                partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER");

                                if(partNumberTargetIndex != -1){
                                    upcMatched.add(upcValue);
                                    partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString());
                                    System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex));

                                }
                            }

                        }

                    }

                }

                for(int i=0;i<upcMatched.size();i++){
                    System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i));

                }

            }catch(Exception e){
                e.printStackTrace();
            }finally{
                sparkSession.stop();
                sqlContext.clearCache();
                javaSparkContext.close();
            }

            return upcMatchedItemIDs;

        }

    }

尝试在两个数据集的数据帧之间进行匹配记录的内部连接。