Spark Sql 2.3 - DataFrame - SaveMode.Append - 问题

Spark Sql 2.3 - DataFrame - SaveMode.Append - issue

能否请您建议以下方法是否正确?我是 Spark 的新手,我想将数据插入现有 table.

    Dataset<Row> logDataFrame = spark.createDataFrame(rowRDD, schema);

    if (spark.catalog().tableExists("mylogs")) {
      logDataFrame.write().mode("append").insertInto("mylogs");// exception

    } else {
        logDataFrame.createOrReplaceTempView("mylogs"); // This is working fine
    }

    Dataset<Row> results = spark.sql("SELECT count(a1) FROM mylogs");

低于异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:352)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:350)

根据评论编辑代码:

    Dataset<Row> logDataFrame = sparkSession.createDataFrame(rowRDD, schema);

    if (sparkSession.catalog().tableExists("mylogs")) {
        logDataFrame.registerTempTable("temptable");
        sparkSession.sql("insert into table mylogs select * from temptable");
       //logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
    } else {
        logDataFrame.createOrReplaceTempView("mylogs");
    }

    Dataset<Row> results = sparkSession.sql("SELECT count(a1) FROM mylogs");

出现以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
+- Project [a1#22, b1#23, c1#24, d1#25]
   +- SubqueryAlias temptable
      +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)

首先将您的数据框注册为 tempTable

logDataFrame.registerTempTable("temptable")

然后将您的异常语句替换为

sqlContext.sql("insert into table mylogs select * from temptable"")

您可以使用 SparkSession API 从文本文件创建 Spark 数据集。

根据您在评论中提供的数据样本,我创建了一个名为 Log 的 POJO

public class Log implements Serializable{

    private String col1;
    private String col2;
    private String col3;
    private String col4;
    private String col5;
    private String col6;
    private String col7;

    // getters and setters here

}

使用这个,我应用了平面图将日志行转换为日志对象。

public class LogToDataset {

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder().appName("Log Job").master("spark://localhost:7077")
                .getOrCreate();

        Dataset<String> textDF = spark.read()
                .text("C:\Users\log4jFile.txt")
                .as(Encoders.STRING());
         JavaRDD<Log> logRDD = textDF.toJavaRDD().map(line -> {
             String[] data =line.split(" ");
             Log log = new Log();
             log.setCol1(data[0]);
             log.setCol2(data[1]);
             log.setCol3(data[2]);
             log.setCol4(data[3]);
             log.setCol5(data[4]);
             log.setCol6(data[5]);
             log.setCol7(data[6]);

             return log;
         });

         Dataset<Row> logDataset = spark.createDataFrame(logRDD, Log.class);
        logDataset.write().mode(SaveMode.Append).insertInto("hivelogtable");
         logDataset.createOrReplaceTempView("logtable");
         spark.sql("select * from logtable").show();
    }

}

现在,您应该可以使用 insertInto()saveAsTable() 将数据插入 table,正如其他人在他们的评论中提到的那样。

这是用于测试此代码的示例数据

10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 
10-16 14:45:08.117 11342 30575 V Musicplayer: [Coming::512] 

最终输出:

这是我查询配置单元时的输出table。

+-----+------------+-----+-----+----+------------+-------------+
| col1|        col2| col3| col4|col5|        col6|         col7|
+-----+------------+-----+-----+----+------------+-------------+
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
|10-16|14:45:08.117|11342|30575|   V|Musicplayer:|[Coming::512]|
+-----+------------+-----+-----+----+------------+-------------+

我希望它能帮助别人,需要检查 'mylogs' table 是否存在,否则如果我们尝试在运行时直接使用 'Append' 键它会抛出异常 'mylogs' table 不退出。

并且无需与 'temp' table 一起洗牌。

    logDataFrame = spark.createDataFrame(rowRDD, schema);

    if (spark.catalog().tableExists("mylogs")) {
        logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");
    } else {
        logDataFrame.createOrReplaceTempView("mylogs");
    }