从 Spark Structured Streaming Dataframe 将记录写入 MYSQL 的问题
Issue in writing records in into MYSQL from Spark Structured Streaming Dataframe
我正在使用下面的代码将 spark Streaming 数据帧写入 MQSQL DB。下面是 kafka 主题 JSON 数据格式和 MYSQL table schema.Column 名称和类型是一样的。
但是我看不到写在MYSQLtable中的记录。 Table 为空,建议为零 records.Please。
Kafka 主题数据格式
ssingh@RENLTP2N073:/mnt/d/confluent-6.0.0/bin$ ./kafka-console-consumer --topic sarvtopic --from-beginning --bootstrap-server localhost:9092
{"id":1,"firstname":"James","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1, “性别”:“男”,“薪水”:3000}
{"id":2,"firstname":"Michael","middlename":"Rose","lastname":"",""dob_year":2010,"dob_month":3, "性别":"男","薪水":4000}
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("SSKafka") \
.getOrCreate()
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sarvtopic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
dsraw.printSchema()
from pyspark.sql.types import StructField, StructType, StringType,LongType
from pyspark.sql.functions import *
custom_schema = StructType([
StructField("id", LongType(), True),
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
StructField("dob_year", StringType(), True),
StructField("dob_month", LongType(), True),
StructField("gender", StringType(), True),
StructField("salary", LongType(), True),
])
Person_details_df2 = ds\
.select(from_json(col("value"), custom_schema).alias("Person_details"))
Person_details_df3 = Person_details_df2.select("Person_details.*")
from pyspark.sql import DataFrameWriter
def foreach_batch_function(df, epoch_id):
Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka", user='root',password='root34')
pass
query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()
query
Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>
MYSQL table Schema:
create table sparkkafka(
id int,
firstname VARCHAR(40) NOT NULL,
middlename VARCHAR(40) NOT NULL,
lastname VARCHAR(40) NOT NULL,
dob_year int(40) NOT NULL,
dob_month int(40) NOT NULL,
gender VARCHAR(40) NOT NULL,
salary int(40) NOT NULL,
PRIMARY KEY (id)
);
我推测Person_details_df3是你的streaming dataframe而且你的spark版本是2.4.0以上的版本。
要使用foreachBatch API 写如下:
db_target_properties = {"user":"xxxx", "password":"yyyyy"}
def foreach_batch_function(df, epoch_id):
df.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', table="sparkkafka", properties=db_target_properties)
pass
query = Person_details_df3.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start()
query.awaitTermination()
我正在使用下面的代码将 spark Streaming 数据帧写入 MQSQL DB。下面是 kafka 主题 JSON 数据格式和 MYSQL table schema.Column 名称和类型是一样的。 但是我看不到写在MYSQLtable中的记录。 Table 为空,建议为零 records.Please。
Kafka 主题数据格式
ssingh@RENLTP2N073:/mnt/d/confluent-6.0.0/bin$ ./kafka-console-consumer --topic sarvtopic --from-beginning --bootstrap-server localhost:9092
{"id":1,"firstname":"James","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1, “性别”:“男”,“薪水”:3000}
{"id":2,"firstname":"Michael","middlename":"Rose","lastname":"",""dob_year":2010,"dob_month":3, "性别":"男","薪水":4000}
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("SSKafka") \
.getOrCreate()
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sarvtopic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
dsraw.printSchema()
from pyspark.sql.types import StructField, StructType, StringType,LongType
from pyspark.sql.functions import *
custom_schema = StructType([
StructField("id", LongType(), True),
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
StructField("dob_year", StringType(), True),
StructField("dob_month", LongType(), True),
StructField("gender", StringType(), True),
StructField("salary", LongType(), True),
])
Person_details_df2 = ds\
.select(from_json(col("value"), custom_schema).alias("Person_details"))
Person_details_df3 = Person_details_df2.select("Person_details.*")
from pyspark.sql import DataFrameWriter
def foreach_batch_function(df, epoch_id):
Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka", user='root',password='root34')
pass
query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()
query
Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>
MYSQL table Schema:
create table sparkkafka(
id int,
firstname VARCHAR(40) NOT NULL,
middlename VARCHAR(40) NOT NULL,
lastname VARCHAR(40) NOT NULL,
dob_year int(40) NOT NULL,
dob_month int(40) NOT NULL,
gender VARCHAR(40) NOT NULL,
salary int(40) NOT NULL,
PRIMARY KEY (id)
);
我推测Person_details_df3是你的streaming dataframe而且你的spark版本是2.4.0以上的版本。
要使用foreachBatch API 写如下:
db_target_properties = {"user":"xxxx", "password":"yyyyy"}
def foreach_batch_function(df, epoch_id):
df.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', table="sparkkafka", properties=db_target_properties)
pass
query = Person_details_df3.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start()
query.awaitTermination()