来自 kafka spark 2.4.5 的 Spark 结构化流式传输未保存到 mysql db

Spark structured streaming from kafka spark 2.4.5 not saving into mysql db

来自 kafka 生产者的消息正在流式传输,但未保存在数据库中。我正在使用 mysql 8.0.20。我错过了什么吗?

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

object SparkStreamingKafka1 {
  def main(args:Array[String]):Unit={
      System.setProperty("hadoop.home.dir", "C:\hadoop\")
      val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
      import spark.implicits._
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "demo1")
        .option("startingOffsets", "earliest") 
        .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
      .add("stock_name",StringType)
      .add("stock_price",IntegerType)
      .add("date",StringType)
      
 val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")
    personDF.createOrReplaceTempView("persontab")
    
      spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.foreachBatch {(batchDF, batchId) =>
      println("inside the foreachbatch")
     batchDF.show() 
  batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()

println("saved")
      
    }
      .outputMode("complete")
  .start()
 
  }
}

下面是pom.xml

 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
     <version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
             <version>2.4.5</version>
        </dependency> 
        
         <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>2.4.5</version>
        </dependency> 
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency> 
      

我必须包含另外两个 sql 查询,我是否需要为每个 sql 查询编写单独的 foreachbatch?或者是否有任何代码优化。有什么建议吗

package mysql.kafka.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

object SparkStreamingKafka1 {
  def main(args:Array[String]):Unit={
      System.setProperty("hadoop.home.dir", "C:\hadoop\")
      val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
      import spark.implicits._
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "demo2")
        .option("startingOffsets", "earliest") // From starting
        .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
      .add("stock_name",StringType)
      .add("stock_price",IntegerType)
      .add("date",StringType)
      
 val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")
    personDF.createOrReplaceTempView("persontab")
   
    
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()

println("saved")
      
    }
  .start()
  .awaitTermination()
 
  }
 
}

以上代码现在可以运行了。我在 start 命令之前删除了 outputMode("complete"),并保留在 writestream 之后。所以在那之后它工作正常。