Pyspark streaming 将旋转批次附加到 sql 服务器
Pyspark streaming append pivoted batch to sql server
我正在使用 pyspark streaming 从 kafka 服务器流式传输数据,逐批处理它(使用 foreachBatch),并使用 jdbc 将每个批次附加到 Microsoft SQL 服务器。
以下是我的代码的主要相关部分:
定义流
string_value_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", constants.kafka_server) \
.option("subscribe", "topics") \
.option("maxOffsetsPerTrigger", 1000) \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(value AS STRING)")
定义模式和操作 df
schema = T.StructType([T.StructField('StationId', T.StringType(), False),
T.StructField('Date', T.StringType(), False),
T.StructField('Variable', T.StringType(), False),
T.StructField('Value', T.IntegerType(), False)])
json_df = string_value_df.select(F.from_json(F.col("value"),schema=schema).alias('json'))
streaming_df = json_df.select("json.*")
开始直播
query = df \
.writeStream \
.foreachBatch(write_to_sql) \
.outputMode("Update") \
.start() \
.awaitTermination()
在每次批处理过程中,我使用 pivot 将多个可变值记录转换为列形式的变量,同时根据“StationID”和“Date”进行分组
原始数据形式(每批如何到达)为:
站号
日期
变量
价值
一个
yyyyMMdd
PRCP
x1
一个
yyyyMMdd
雪
x2
一个
yyyyMMdd
SNWD
x3
一个
yyyyMMdd
TMAX
x4
一个
yyyyMMdd
TMIN
x5
在我的转换之后,包括 pivot:
站号
日期
PRCP
雪
SNWD
TMAX
TMIN
一个
yyyyMMdd
x1
x2
x3
x4
x5
这是应用于每个批次的函数:
凑一批
def write_to_sql(df, df_id):
df = df.groupBy("StationId", "Date").pivot("Variable").sum("Value")
try:
df.write \
.format("jdbc") \
.mode("append") \
.option("url", constants.url) \
.option("dbtable", constants.table_name) \
.option("user", constants.username) \
.option("password", constants.password) \
.save()
except ValueError as error:
print("Connector write failed", error)
我的问题是将新批次附加到服务器上的 SQL table。
当同一批次中有两行具有相同的 时,数据透视表工作正常并正确显示在服务器中。
在内部,如果特定 对的多个(不同变量)记录被分成多个批次,然后将其附加到服务器时,它似乎没有完全分组。
站号
日期
PRCP
雪
SNWD
TMAX
TMIN
一个
yyyyMMdd
x1
无
x3
无
无
一个
yyyyMMdd
无
x2
无
无
无
[x1和x3出现在同一批]
有没有什么有效的方法可以将批次附加到服务器,同时保持 跨不同变量的分组?
非常感谢
您可以提前执行以下操作来匹配架构,这样您就不会 运行 遇到麻烦:
这是我在尝试过滤掉 4 个变量 PRCP
、TMAX
、TMIN
、SNWD
.
时对练习的解决方案
batch_df = batch_df.filter(batch_df.Q_Flag.isNull()).drop("M_Flag", "Q_Flag", "S_Flag", "ObsTime")
batch_df = batch_df.withColumn("Year_Rec", F.expr("substring(Date,0,4)").cast(IntegerType())).withColumn(
"Month_Rec", F.expr("substring(Date,5,2)").cast(IntegerType())).drop("Date")
batch_df = batch_df.filter(
((batch_df.Variable == "PRCP") & (batch_df.Value >= 0))
| (batch_df.Variable == "TMAX")
| (batch_df.Variable == "TMIN")
| ((batch_df.Variable == "SNWD") & (batch_df.Value >= 0)))
ger_swe_df = batch_df.groupBy("StationId", "Year_Rec", "Month_Rec", "Variable") \
.agg(F.sum(F.col("Value")).alias("batch_sum"), F.count("*").alias("batch_count"))
ger_swe_df = ger_swe_df.groupby("StationId", "Year_Rec", "Month_Rec").pivot("Variable") \
.agg(F.first("batch_count").cast(LongType()).alias('batch_count'),
F.first("batch_sum").cast(LongType()).alias('batch_sum'))
cols = {str(col)[:4] for col in ger_swe_df.columns if str(col)[:4] in vars}
missing = vars - cols
for var in missing:
ger_swe_df = ger_swe_df.withColumn(var + '_batch_count', F.lit(None).cast(LongType())).withColumn(
var + '_batch_sum', F.lit(None).cast(LongType()))
ger_swe_df = ger_swe_df.select("StationId", "Year_Rec", "Month_Rec", "PRCP_batch_count", 'PRCP_batch_sum',
'TMAX_batch_count', 'TMAX_batch_sum', 'TMIN_batch_count', 'TMIN_batch_sum',
'SNWD_batch_count', 'SNWD_batch_sum') \
.withColumn("Season", ((F.col("Month_Rec") % 12) / 3 + 1).cast(IntegerType()))
ger_swe_df.withColumn("batchId", F.lit(batch_id)) \
.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
我正在使用 pyspark streaming 从 kafka 服务器流式传输数据,逐批处理它(使用 foreachBatch),并使用 jdbc 将每个批次附加到 Microsoft SQL 服务器。
以下是我的代码的主要相关部分:
定义流
string_value_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", constants.kafka_server) \
.option("subscribe", "topics") \
.option("maxOffsetsPerTrigger", 1000) \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(value AS STRING)")
定义模式和操作 df
schema = T.StructType([T.StructField('StationId', T.StringType(), False),
T.StructField('Date', T.StringType(), False),
T.StructField('Variable', T.StringType(), False),
T.StructField('Value', T.IntegerType(), False)])
json_df = string_value_df.select(F.from_json(F.col("value"),schema=schema).alias('json'))
streaming_df = json_df.select("json.*")
开始直播
query = df \
.writeStream \
.foreachBatch(write_to_sql) \
.outputMode("Update") \
.start() \
.awaitTermination()
在每次批处理过程中,我使用 pivot 将多个可变值记录转换为列形式的变量,同时根据“StationID”和“Date”进行分组
原始数据形式(每批如何到达)为:
站号 | 日期 | 变量 | 价值 |
---|---|---|---|
一个 | yyyyMMdd | PRCP | x1 |
一个 | yyyyMMdd | 雪 | x2 |
一个 | yyyyMMdd | SNWD | x3 |
一个 | yyyyMMdd | TMAX | x4 |
一个 | yyyyMMdd | TMIN | x5 |
在我的转换之后,包括 pivot:
站号 | 日期 | PRCP | 雪 | SNWD | TMAX | TMIN |
---|---|---|---|---|---|---|
一个 | yyyyMMdd | x1 | x2 | x3 | x4 | x5 |
这是应用于每个批次的函数:
凑一批
def write_to_sql(df, df_id):
df = df.groupBy("StationId", "Date").pivot("Variable").sum("Value")
try:
df.write \
.format("jdbc") \
.mode("append") \
.option("url", constants.url) \
.option("dbtable", constants.table_name) \
.option("user", constants.username) \
.option("password", constants.password) \
.save()
except ValueError as error:
print("Connector write failed", error)
我的问题是将新批次附加到服务器上的 SQL table。
当同一批次中有两行具有相同的
在内部,如果特定
站号 | 日期 | PRCP | 雪 | SNWD | TMAX | TMIN |
---|---|---|---|---|---|---|
一个 | yyyyMMdd | x1 | 无 | x3 | 无 | 无 |
一个 | yyyyMMdd | 无 | x2 | 无 | 无 | 无 |
[x1和x3出现在同一批]
有没有什么有效的方法可以将批次附加到服务器,同时保持
非常感谢
您可以提前执行以下操作来匹配架构,这样您就不会 运行 遇到麻烦:
这是我在尝试过滤掉 4 个变量 PRCP
、TMAX
、TMIN
、SNWD
.
batch_df = batch_df.filter(batch_df.Q_Flag.isNull()).drop("M_Flag", "Q_Flag", "S_Flag", "ObsTime")
batch_df = batch_df.withColumn("Year_Rec", F.expr("substring(Date,0,4)").cast(IntegerType())).withColumn(
"Month_Rec", F.expr("substring(Date,5,2)").cast(IntegerType())).drop("Date")
batch_df = batch_df.filter(
((batch_df.Variable == "PRCP") & (batch_df.Value >= 0))
| (batch_df.Variable == "TMAX")
| (batch_df.Variable == "TMIN")
| ((batch_df.Variable == "SNWD") & (batch_df.Value >= 0)))
ger_swe_df = batch_df.groupBy("StationId", "Year_Rec", "Month_Rec", "Variable") \
.agg(F.sum(F.col("Value")).alias("batch_sum"), F.count("*").alias("batch_count"))
ger_swe_df = ger_swe_df.groupby("StationId", "Year_Rec", "Month_Rec").pivot("Variable") \
.agg(F.first("batch_count").cast(LongType()).alias('batch_count'),
F.first("batch_sum").cast(LongType()).alias('batch_sum'))
cols = {str(col)[:4] for col in ger_swe_df.columns if str(col)[:4] in vars}
missing = vars - cols
for var in missing:
ger_swe_df = ger_swe_df.withColumn(var + '_batch_count', F.lit(None).cast(LongType())).withColumn(
var + '_batch_sum', F.lit(None).cast(LongType()))
ger_swe_df = ger_swe_df.select("StationId", "Year_Rec", "Month_Rec", "PRCP_batch_count", 'PRCP_batch_sum',
'TMAX_batch_count', 'TMAX_batch_sum', 'TMIN_batch_count', 'TMIN_batch_sum',
'SNWD_batch_count', 'SNWD_batch_sum') \
.withColumn("Season", ((F.col("Month_Rec") % 12) / 3 + 1).cast(IntegerType()))
ger_swe_df.withColumn("batchId", F.lit(batch_id)) \
.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()