Spark-SQL 连接两个具有相同列名的数据框/数据集
Spark-SQL Joining two dataframes/ datasets with same column name
我有以下两个数据集
controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status
我正在使用 Java spark api 加入它们,我只需要最终数据集中的特定列
private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")),
"left_outer");
但是我得到以下错误
org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)
似乎有问题,因为两个数据框都有 merchant_id 列。
注意:如果我不使用 .selectExpr() 它工作正常。但它将显示来自第一和第二数据集的所有列。
您正在使用 sf_account_columns 中列出的列加入 DataFrame。该数组不包含您要加入的列,因此 DataFrame 也没有。将此列添加到提到的数组
如果两个DataFrame中的join columns名称相同,你可以简单的将其定义为join condition。在 Scala 中它更简洁一些,使用 Java 你需要将 Java List 转换为 Scala Seq:
Seq<String> joinColumns = scala.collection.JavaConversions
.asScalaBuffer(Lists.newArrayList("merchant_id"));
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns), joinColumns), "left_outer");
这将导致 DataFrame 只有一个连接列。
我有以下两个数据集
controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status
我正在使用 Java spark api 加入它们,我只需要最终数据集中的特定列
private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")),
"left_outer");
但是我得到以下错误
org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)
似乎有问题,因为两个数据框都有 merchant_id 列。
注意:如果我不使用 .selectExpr() 它工作正常。但它将显示来自第一和第二数据集的所有列。
您正在使用 sf_account_columns 中列出的列加入 DataFrame。该数组不包含您要加入的列,因此 DataFrame 也没有。将此列添加到提到的数组
如果两个DataFrame中的join columns名称相同,你可以简单的将其定义为join condition。在 Scala 中它更简洁一些,使用 Java 你需要将 Java List 转换为 Scala Seq:
Seq<String> joinColumns = scala.collection.JavaConversions
.asScalaBuffer(Lists.newArrayList("merchant_id"));
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns), joinColumns), "left_outer");
这将导致 DataFrame 只有一个连接列。