来自 kafka spark 2.4.5 的 Spark 结构化流式传输未保存到 mysql db
Spark structured streaming from kafka spark 2.4.5 not saving into mysql db
来自 kafka 生产者的消息正在流式传输,但未保存在数据库中。我正在使用 mysql 8.0.20。我错过了什么吗?
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo1")
.option("startingOffsets", "earliest")
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.foreachBatch {(batchDF, batchId) =>
println("inside the foreachbatch")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
.outputMode("complete")
.start()
}
}
下面是pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
我必须包含另外两个 sql 查询,我是否需要为每个 sql 查询编写单独的 foreachbatch?或者是否有任何代码优化。有什么建议吗
package mysql.kafka.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo2")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
.start()
.awaitTermination()
}
}
以上代码现在可以运行了。我在 start 命令之前删除了 outputMode("complete"),并保留在 writestream 之后。所以在那之后它工作正常。
来自 kafka 生产者的消息正在流式传输,但未保存在数据库中。我正在使用 mysql 8.0.20。我错过了什么吗?
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo1")
.option("startingOffsets", "earliest")
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.foreachBatch {(batchDF, batchId) =>
println("inside the foreachbatch")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
.outputMode("complete")
.start()
}
}
下面是pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
我必须包含另外两个 sql 查询,我是否需要为每个 sql 查询编写单独的 foreachbatch?或者是否有任何代码优化。有什么建议吗
package mysql.kafka.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
object SparkStreamingKafka1 {
def main(args:Array[String]):Unit={
System.setProperty("hadoop.home.dir", "C:\hadoop\")
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo2")
.option("startingOffsets", "earliest") // From starting
.load()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
.add("stock_name",StringType)
.add("stock_price",IntegerType)
.add("date",StringType)
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
.select("data.*")
personDF.createOrReplaceTempView("persontab")
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
println("saved")
}
.start()
.awaitTermination()
}
}
以上代码现在可以运行了。我在 start 命令之前删除了 outputMode("complete"),并保留在 writestream 之后。所以在那之后它工作正常。