Pyspark streaming 将旋转批次附加到 sql 服务器

Pyspark streaming append pivoted batch to sql server

我正在使用 pyspark streaming 从 kafka 服务器流式传输数据,逐批处理它(使用 foreachBatch),并使用 jdbc 将每个批次附加到 Microsoft SQL 服务器。

以下是我的代码的主要相关部分:

定义流

string_value_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", constants.kafka_server) \
    .option("subscribe", "topics") \
    .option("maxOffsetsPerTrigger", 1000) \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

定义模式和操作 df

schema = T.StructType([T.StructField('StationId', T.StringType(), False),
                            T.StructField('Date', T.StringType(), False),
                            T.StructField('Variable', T.StringType(), False),
                            T.StructField('Value', T.IntegerType(), False)])

json_df = string_value_df.select(F.from_json(F.col("value"),schema=schema).alias('json'))
streaming_df = json_df.select("json.*")

开始直播

query = df \
    .writeStream \
    .foreachBatch(write_to_sql) \
    .outputMode("Update") \
    .start() \
    .awaitTermination()

在每次批处理过程中,我使用 pivot 将多个可变值记录转换为列形式的变量,同时根据“StationID”和“Date”进行分组

原始数据形式(每批如何到达)为:

站号 日期 变量 价值
一个 yyyyMMdd PRCP x1
一个 yyyyMMdd x2
一个 yyyyMMdd SNWD x3
一个 yyyyMMdd TMAX x4
一个 yyyyMMdd TMIN x5

在我的转换之后,包括 pivot:

站号 日期 PRCP SNWD TMAX TMIN
一个 yyyyMMdd x1 x2 x3 x4 x5

这是应用于每个批次的函数:

凑一批

def write_to_sql(df, df_id):
    df = df.groupBy("StationId", "Date").pivot("Variable").sum("Value")

    try:
        df.write \
            .format("jdbc") \
            .mode("append") \
            .option("url", constants.url) \
            .option("dbtable", constants.table_name) \
            .option("user", constants.username) \
            .option("password", constants.password) \
            .save()
    except ValueError as error:
        print("Connector write failed", error)

我的问题是将新批次附加到服务器上的 SQL table。
当同一批次中有两行具有相同的 时,数据透视表工作正常并正确显示在服务器中。
在内部,如果特定 对的多个(不同变量)记录被分成多个批次,然后将其附加到服务器时,它似乎没有完全分组。

站号 日期 PRCP SNWD TMAX TMIN
一个 yyyyMMdd x1 x3
一个 yyyyMMdd x2

[x1和x3出现在同一批]
有没有什么有效的方法可以将批次附加到服务器,同时保持 跨不同变量的分组?
非常感谢

您可以提前执行以下操作来匹配架构,这样您就不会 运行 遇到麻烦: 这是我在尝试过滤掉 4 个变量 PRCPTMAXTMINSNWD.

时对练习的解决方案
batch_df = batch_df.filter(batch_df.Q_Flag.isNull()).drop("M_Flag", "Q_Flag", "S_Flag", "ObsTime")
batch_df = batch_df.withColumn("Year_Rec", F.expr("substring(Date,0,4)").cast(IntegerType())).withColumn(
        "Month_Rec", F.expr("substring(Date,5,2)").cast(IntegerType())).drop("Date")
batch_df = batch_df.filter(
        ((batch_df.Variable == "PRCP") & (batch_df.Value >= 0))
        | (batch_df.Variable == "TMAX")
        | (batch_df.Variable == "TMIN")
        | ((batch_df.Variable == "SNWD") & (batch_df.Value >= 0)))
    ger_swe_df = batch_df.groupBy("StationId", "Year_Rec", "Month_Rec", "Variable") \
        .agg(F.sum(F.col("Value")).alias("batch_sum"), F.count("*").alias("batch_count"))
    ger_swe_df = ger_swe_df.groupby("StationId", "Year_Rec", "Month_Rec").pivot("Variable") \
        .agg(F.first("batch_count").cast(LongType()).alias('batch_count'),
             F.first("batch_sum").cast(LongType()).alias('batch_sum'))
cols = {str(col)[:4] for col in ger_swe_df.columns if str(col)[:4] in vars}
missing = vars - cols
for var in missing:
    ger_swe_df = ger_swe_df.withColumn(var + '_batch_count', F.lit(None).cast(LongType())).withColumn(
            var + '_batch_sum', F.lit(None).cast(LongType()))
ger_swe_df = ger_swe_df.select("StationId", "Year_Rec", "Month_Rec", "PRCP_batch_count", 'PRCP_batch_sum',
                                   'TMAX_batch_count', 'TMAX_batch_sum', 'TMIN_batch_count', 'TMIN_batch_sum',
                                   'SNWD_batch_count', 'SNWD_batch_sum') \
        .withColumn("Season", ((F.col("Month_Rec") % 12) / 3 + 1).cast(IntegerType()))
ger_swe_df.withColumn("batchId", F.lit(batch_id)) \
        .write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .save()