Apache Spark 性能问题
Apache Spark Performance Issue
我们考虑过使用 Apache Spark 来更快地匹配记录,但我们发现它比使用 select 语句匹配 SQL 效率低得多。
使用,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br>
Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
我们能够将大约 180 万条记录导入到存储在数据集对象中的 spark 环境中。
现在使用过滤功能
targetFileContent.filter(col("TARGETUPC").equalTo(upcValue))
上面的过滤器语句处于一个循环中,其中 upcValue 会针对大约 46k 个 ID 进行更新。
这个程序执行了几个小时,但我们使用 sql IN 运算符尝试了同样的操作,其中我们保留了不到一分钟执行的所有 46k UPC ID。
配置:
火花-sql 2.11
星火核心 2.11
JDK8
Windows10,单节点4核3Ghz,16GB内存。
C 盘 -> 12 GB 可用空间 space.
Eclipse -> 运行 配置 -> –Xms15000m.
如有错误请帮助我们分析理解,并建议我们需要做些什么来提高性能。
@Component("upcExactMatch")
public class UPCExactMatch {
@Autowired
private Environment envirnoment;
@Autowired
private LoadCSV loadCSV;
@Autowired
private SQLHandler sqlHandler;
public ArrayList<Row> perform(){
ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>();
ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>();
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(javaSparkContext);
SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate();
try{
Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext);
// load target from database
Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
System.out.println("File counts :"+sourceFileContent.count()+" : "+targetFileContent.count());
ArrayList<String> upcMatched = new ArrayList<String>();
ArrayList<String> partNumberMatched = new ArrayList<String>();
List<Row> sourceFileContents = sourceFileContent.collectAsList();
int upcColumnIndex=-1;
int itemIDColumnIndex=-1;
int partNumberTargetIndex=-1;
String upcValue="";
StructType schema = targetFileContent.schema();
List<Row> data = Arrays.asList();
Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema);
for(Row rowSourceFileContent: sourceFileContents){
upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC");
if(!rowSourceFileContent.isNullAt(upcColumnIndex)){
upcValue=rowSourceFileContent.get(upcColumnIndex).toString();
upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue));
if(upcMatchedRows.count() > 0){
for(Row upcMatchedRow: upcMatchedRows.collectAsList()){
partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER");
if(partNumberTargetIndex != -1){
upcMatched.add(upcValue);
partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString());
System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex));
}
}
}
}
}
for(int i=0;i<upcMatched.size();i++){
System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i));
}
}catch(Exception e){
e.printStackTrace();
}finally{
sparkSession.stop();
sqlContext.clearCache();
javaSparkContext.close();
}
return upcMatchedItemIDs;
}
}
尝试在两个数据集的数据帧之间进行匹配记录的内部连接。
我们考虑过使用 Apache Spark 来更快地匹配记录,但我们发现它比使用 select 语句匹配 SQL 效率低得多。
使用,
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("AIRecordLinkage").setMaster("local[*]"));<br>
Dataset<Row> sourceFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
我们能够将大约 180 万条记录导入到存储在数据集对象中的 spark 环境中。 现在使用过滤功能 targetFileContent.filter(col("TARGETUPC").equalTo(upcValue))
上面的过滤器语句处于一个循环中,其中 upcValue 会针对大约 46k 个 ID 进行更新。
这个程序执行了几个小时,但我们使用 sql IN 运算符尝试了同样的操作,其中我们保留了不到一分钟执行的所有 46k UPC ID。
配置:
火花-sql 2.11
星火核心 2.11
JDK8
Windows10,单节点4核3Ghz,16GB内存。
C 盘 -> 12 GB 可用空间 space.
Eclipse -> 运行 配置 -> –Xms15000m.
如有错误请帮助我们分析理解,并建议我们需要做些什么来提高性能。
@Component("upcExactMatch")
public class UPCExactMatch {
@Autowired
private Environment envirnoment;
@Autowired
private LoadCSV loadCSV;
@Autowired
private SQLHandler sqlHandler;
public ArrayList<Row> perform(){
ArrayList<Row> upcNonMatchedItemIDs=new ArrayList<Row>();
ArrayList<Row> upcMatchedItemIDs=new ArrayList<Row>();
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(javaSparkContext);
SparkSession sparkSession = SparkSession.builder().appName("JavaStopWordshandlerTest").getOrCreate();
try{
Dataset<Row> sourceFileContent =loadCSV.load(sourceFileName,sourceFileLocation,javaSparkContext,sqlContext);
// load target from database
Dataset<Row> targetFileContent = spark.read().jdbc("jdbc:oracle:thin:@//Connection_IP/stage", "Database_name.Table_name", connectionProperties);
System.out.println("File counts :"+sourceFileContent.count()+" : "+targetFileContent.count());
ArrayList<String> upcMatched = new ArrayList<String>();
ArrayList<String> partNumberMatched = new ArrayList<String>();
List<Row> sourceFileContents = sourceFileContent.collectAsList();
int upcColumnIndex=-1;
int itemIDColumnIndex=-1;
int partNumberTargetIndex=-1;
String upcValue="";
StructType schema = targetFileContent.schema();
List<Row> data = Arrays.asList();
Dataset<Row> upcMatchedRows = sparkSession.createDataFrame(data, schema);
for(Row rowSourceFileContent: sourceFileContents){
upcColumnIndex=rowSourceFileContent.fieldIndex("Vendor UPC");
if(!rowSourceFileContent.isNullAt(upcColumnIndex)){
upcValue=rowSourceFileContent.get(upcColumnIndex).toString();
upcMatchedRows=targetFileContent.filter(col("TARGETUPC").equalTo(upcValue));
if(upcMatchedRows.count() > 0){
for(Row upcMatchedRow: upcMatchedRows.collectAsList()){
partNumberTargetIndex=upcMatchedRow.fieldIndex("PART_NUMBER");
if(partNumberTargetIndex != -1){
upcMatched.add(upcValue);
partNumberMatched.add(upcMatchedRow.get(partNumberTargetIndex).toString());
System.out.println("Source UPC : "+upcValue +"\tTarget part number :"+ upcMatchedRow.get(partNumberTargetIndex));
}
}
}
}
}
for(int i=0;i<upcMatched.size();i++){
System.out.println("Matched Exact UPC ids are :"+upcMatched.get(i) + "\t:Target\t"+partNumberMatched.get(i));
}
}catch(Exception e){
e.printStackTrace();
}finally{
sparkSession.stop();
sqlContext.clearCache();
javaSparkContext.close();
}
return upcMatchedItemIDs;
}
}
尝试在两个数据集的数据帧之间进行匹配记录的内部连接。