Spark Sql 2.3 - DataFrame - SaveMode.Append - 问题
Spark Sql 2.3 - DataFrame - SaveMode.Append - issue
能否请您建议以下方法是否正确?我是 Spark 的新手,我想将数据插入现有 table.
Dataset<Row> logDataFrame = spark.createDataFrame(rowRDD, schema);
if (spark.catalog().tableExists("mylogs")) {
logDataFrame.write().mode("append").insertInto("mylogs");// exception
} else {
logDataFrame.createOrReplaceTempView("mylogs"); // This is working fine
}
Dataset<Row> results = spark.sql("SELECT count(a1) FROM mylogs");
低于异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:352)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:350)
根据评论编辑代码:
Dataset<Row> logDataFrame = sparkSession.createDataFrame(rowRDD, schema);
if (sparkSession.catalog().tableExists("mylogs")) {
logDataFrame.registerTempTable("temptable");
sparkSession.sql("insert into table mylogs select * from temptable");
//logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
} else {
logDataFrame.createOrReplaceTempView("mylogs");
}
Dataset<Row> results = sparkSession.sql("SELECT count(a1) FROM mylogs");
出现以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- Project [a1#22, b1#23, c1#24, d1#25]
+- SubqueryAlias temptable
+- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
首先将您的数据框注册为 tempTable
logDataFrame.registerTempTable("temptable")
然后将您的异常语句替换为
sqlContext.sql("insert into table mylogs select * from temptable"")
您可以使用 SparkSession API 从文本文件创建 Spark 数据集。
根据您在评论中提供的数据样本,我创建了一个名为 Log 的 POJO
public class Log implements Serializable{
private String col1;
private String col2;
private String col3;
private String col4;
private String col5;
private String col6;
private String col7;
// getters and setters here
}
使用这个,我应用了平面图将日志行转换为日志对象。
public class LogToDataset {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Log Job").master("spark://localhost:7077")
.getOrCreate();
Dataset<String> textDF = spark.read()
.text("C:\Users\log4jFile.txt")
.as(Encoders.STRING());
JavaRDD<Log> logRDD = textDF.toJavaRDD().map(line -> {
String[] data =line.split(" ");
Log log = new Log();
log.setCol1(data[0]);
log.setCol2(data[1]);
log.setCol3(data[2]);
log.setCol4(data[3]);
log.setCol5(data[4]);
log.setCol6(data[5]);
log.setCol7(data[6]);
return log;
});
Dataset<Row> logDataset = spark.createDataFrame(logRDD, Log.class);
logDataset.write().mode(SaveMode.Append).insertInto("hivelogtable");
logDataset.createOrReplaceTempView("logtable");
spark.sql("select * from logtable").show();
}
}
现在,您应该可以使用 insertInto()
或 saveAsTable()
将数据插入 table,正如其他人在他们的评论中提到的那样。
这是用于测试此代码的示例数据
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
最终输出:
这是我查询配置单元时的输出table。
+-----+------------+-----+-----+----+------------+-------------+
| col1| col2| col3| col4|col5| col6| col7|
+-----+------------+-----+-----+----+------------+-------------+
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
+-----+------------+-----+-----+----+------------+-------------+
我希望它能帮助别人,需要检查 'mylogs' table 是否存在,否则如果我们尝试在运行时直接使用 'Append' 键它会抛出异常 'mylogs' table 不退出。
并且无需与 'temp' table 一起洗牌。
logDataFrame = spark.createDataFrame(rowRDD, schema);
if (spark.catalog().tableExists("mylogs")) {
logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
} else {
logDataFrame.createOrReplaceTempView("mylogs");
}
能否请您建议以下方法是否正确?我是 Spark 的新手,我想将数据插入现有 table.
Dataset<Row> logDataFrame = spark.createDataFrame(rowRDD, schema);
if (spark.catalog().tableExists("mylogs")) {
logDataFrame.write().mode("append").insertInto("mylogs");// exception
} else {
logDataFrame.createOrReplaceTempView("mylogs"); // This is working fine
}
Dataset<Row> results = spark.sql("SELECT count(a1) FROM mylogs");
低于异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:352)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:350)
根据评论编辑代码:
Dataset<Row> logDataFrame = sparkSession.createDataFrame(rowRDD, schema);
if (sparkSession.catalog().tableExists("mylogs")) {
logDataFrame.registerTempTable("temptable");
sparkSession.sql("insert into table mylogs select * from temptable");
//logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
} else {
logDataFrame.createOrReplaceTempView("mylogs");
}
Dataset<Row> results = sparkSession.sql("SELECT count(a1) FROM mylogs");
出现以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- Project [a1#22, b1#23, c1#24, d1#25]
+- SubqueryAlias temptable
+- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
首先将您的数据框注册为 tempTable
logDataFrame.registerTempTable("temptable")
然后将您的异常语句替换为
sqlContext.sql("insert into table mylogs select * from temptable"")
您可以使用 SparkSession API 从文本文件创建 Spark 数据集。
根据您在评论中提供的数据样本,我创建了一个名为 Log 的 POJO
public class Log implements Serializable{
private String col1;
private String col2;
private String col3;
private String col4;
private String col5;
private String col6;
private String col7;
// getters and setters here
}
使用这个,我应用了平面图将日志行转换为日志对象。
public class LogToDataset {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Log Job").master("spark://localhost:7077")
.getOrCreate();
Dataset<String> textDF = spark.read()
.text("C:\Users\log4jFile.txt")
.as(Encoders.STRING());
JavaRDD<Log> logRDD = textDF.toJavaRDD().map(line -> {
String[] data =line.split(" ");
Log log = new Log();
log.setCol1(data[0]);
log.setCol2(data[1]);
log.setCol3(data[2]);
log.setCol4(data[3]);
log.setCol5(data[4]);
log.setCol6(data[5]);
log.setCol7(data[6]);
return log;
});
Dataset<Row> logDataset = spark.createDataFrame(logRDD, Log.class);
logDataset.write().mode(SaveMode.Append).insertInto("hivelogtable");
logDataset.createOrReplaceTempView("logtable");
spark.sql("select * from logtable").show();
}
}
现在,您应该可以使用 insertInto()
或 saveAsTable()
将数据插入 table,正如其他人在他们的评论中提到的那样。
这是用于测试此代码的示例数据
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512]
最终输出:
这是我查询配置单元时的输出table。
+-----+------------+-----+-----+----+------------+-------------+
| col1| col2| col3| col4|col5| col6| col7|
+-----+------------+-----+-----+----+------------+-------------+
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575| V|Musicplayer:|[Coming::512]|
+-----+------------+-----+-----+----+------------+-------------+
我希望它能帮助别人,需要检查 'mylogs' table 是否存在,否则如果我们尝试在运行时直接使用 'Append' 键它会抛出异常 'mylogs' table 不退出。
并且无需与 'temp' table 一起洗牌。
logDataFrame = spark.createDataFrame(rowRDD, schema);
if (spark.catalog().tableExists("mylogs")) {
logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
} else {
logDataFrame.createOrReplaceTempView("mylogs");
}