无法在 foreachbatch 中写入多个查询

Unable to write multiple queries in foreachbatch


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

object SparkStreamingKafka1 {
 def main(args:Array[String]):Unit={
     System.setProperty("hadoop.home.dir", "C:\hadoop\")
     val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
       spark.sparkContext.setLogLevel("OFF")
     import spark.implicits._
     val df = spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "demo2")
       .option("startingOffsets", "earliest") // From starting
       .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
     .add("stock_name",StringType)
     .add("stock_price",IntegerType)
     .add("date",StringType)
     
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
  .select("data.*")
   personDF.createOrReplaceTempView("persontab")
  /*  spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream
     .format("console")
     .outputMode("complete")
     .start()
     .awaitTermination()*/
   
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()

println("saved")
     
   }
 // .outputMode("complete")
 .start()
 .awaitTermination()

 }

}

我要执行三个查询。一个是我在上面的代码中完成的聚合,并且工作正常。另外两个是where子句查询。如何在这里完成这两个查询。它能够将所有三个查询的结果保存在一个 table 中,还是我需要保存在不同的 table 中?请让我知道这两种方式的操作方法。

package mysql.kafka.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.{min, max, avg}

object multiplequerieskafkastreaming {
  
 def main(args:Array[String]):Unit={
      System.setProperty("hadoop.home.dir", "C:\hadoop\")
      val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
      import spark.implicits._
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "demo3")
        .option("startingOffsets", "earliest") // From starting
        .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
      .add("stock_name",StringType)
      .add("stock_price",IntegerType)
      .add("date",StringType)
      
 val stockDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")
    
  //getting yesterday's date
   val yesterday = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy")
val yesterdaydate = formatter format yesterday
//println(yesterdaydate)

stockDF
.writeStream.foreachBatch{(batchDF:DataFrame,batchId:Long) =>
    println("inside the foreachbatch1")
    batchDF.persist()
    batchDF.createOrReplaceTempView("stocktab")
    println(yesterdaydate)
  
    val Current_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price from stocktab where stock_name='abc'")
    println("select stock_name,stock_price from stocktab where stock_name='abc'")
    Current_stock_pricedf.show()
    val min_max_df= batchDF.sparkSession.sql("""select min(stock_price) as min_stock_price,
    max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from stocktab""")
    min_max_df.show()
  
    val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date='"+yesterdaydate+"'")
        
    Yesterday_stock_pricedf.show()
  
    min_max_df.write.format("jdbc")
    .mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
    //.option("driver","com.mysql.jdbc.driver")
    .option("dbtable","max_min_avg")
    .option("user","root")
    .option("password","root")
    .option("header","true")
    .save()
    println("saved min_max_df")

Current_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","current_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Current_stock_pricedf")

Yesterday_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","Yesterday_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Yesterday_stock_pricedf")
      
    }
  // .outputMode("complete")
  .start()
  .awaitTermination()
 
  }
 
}

代码运行良好。创建了一个临时 table 并在 df.sparkSession.sql("") 的帮助下在 foreachbatch 中进行了多次查询。供参考:。 需要在代码中加入一些异常处理,避免出错。除此之外,主要代码工作正常。

谢谢 @OneCricketeer 的建议。