无法在 foreachbatch 中写入多个查询
Unable to write multiple queries in foreachbatch
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo2")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
/* spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()*/
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
我要执行三个查询。一个是我在上面的代码中完成的聚合,并且工作正常。另外两个是where子句查询。如何在这里完成这两个查询。它能够将所有三个查询的结果保存在一个 table 中,还是我需要保存在不同的 table 中?请让我知道这两种方式的操作方法。
package mysql.kafka.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.{min, max, avg}
object multiplequerieskafkastreaming {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo3")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val stockDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
//getting yesterday's date
val yesterday = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy")
val yesterdaydate = formatter format yesterday
//println(yesterdaydate)
stockDF
.writeStream.foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.persist()
batchDF.createOrReplaceTempView("stocktab")
println(yesterdaydate)
val Current_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price from stocktab where stock_name='abc'")
println("select stock_name,stock_price from stocktab where stock_name='abc'")
Current_stock_pricedf.show()
val min_max_df= batchDF.sparkSession.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from stocktab""")
min_max_df.show()
val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date='"+yesterdaydate+"'")
Yesterday_stock_pricedf.show()
min_max_df.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved min_max_df")
Current_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","current_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Current_stock_pricedf")
Yesterday_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","Yesterday_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Yesterday_stock_pricedf")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
代码运行良好。创建了一个临时 table 并在 df.sparkSession.sql("") 的帮助下在 foreachbatch 中进行了多次查询。供参考:。
需要在代码中加入一些异常处理,避免出错。除此之外,主要代码工作正常。
谢谢 @OneCricketeer 的建议。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo2")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
/* spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()*/
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
我要执行三个查询。一个是我在上面的代码中完成的聚合,并且工作正常。另外两个是where子句查询。如何在这里完成这两个查询。它能够将所有三个查询的结果保存在一个 table 中,还是我需要保存在不同的 table 中?请让我知道这两种方式的操作方法。
package mysql.kafka.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.{min, max, avg}
object multiplequerieskafkastreaming {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo3")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val stockDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
//getting yesterday's date
val yesterday = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy")
val yesterdaydate = formatter format yesterday
//println(yesterdaydate)
stockDF
.writeStream.foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.persist()
batchDF.createOrReplaceTempView("stocktab")
println(yesterdaydate)
val Current_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price from stocktab where stock_name='abc'")
println("select stock_name,stock_price from stocktab where stock_name='abc'")
Current_stock_pricedf.show()
val min_max_df= batchDF.sparkSession.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from stocktab""")
min_max_df.show()
val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date='"+yesterdaydate+"'")
Yesterday_stock_pricedf.show()
min_max_df.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved min_max_df")
Current_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","current_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Current_stock_pricedf")
Yesterday_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","Yesterday_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Yesterday_stock_pricedf")
}
// .outputMode("complete")
.start()
.awaitTermination()
}
}
代码运行良好。创建了一个临时 table 并在 df.sparkSession.sql("") 的帮助下在 foreachbatch 中进行了多次查询。供参考:
谢谢 @OneCricketeer 的建议。