spark structured streaming batch 数据刷新问题(partition by clause)

spark structured streaming batch data refresh issue (partition by clause)

我在将 spark 结构化流数据帧与批处理数据帧连接时遇到问题,我的场景是我有一个 S3 流,它需要与历史数据进行左反连接,returns 记录在历史中不存在(找出新记录)并将这些记录作为新追加写入历史记录(按列分区磁盘数据分区而不是内存)。

当我刷新已分区的历史数据框时,我的历史数据框没有得到更新。

下面是代码的两个代码片段,一个有效,另一个无效。

工作代码和非工作代码之间的唯一区别是 partition_by 子句。

工作代码:-(刷新历史记录)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          //.partitionBy("spark_id", "account_id", "run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

不起作用:-(历史未刷新)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          .partitionBy("spark_id", "account_id","run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

谢谢 斯里

我通过使用按名称联合函数而不是从磁盘读取刷新数据解决了这个问题。

第 1 步:- 读取历史 S3

第 2 步:- 读卡夫卡,查历史

第 3 步:- 使用 union by name spark 函数将处理后的数据写入磁盘并附加到在步骤 1 中创建的数据框。

第一步代码(阅读历史数据框):-

val acctHistDF = sparkSession.read
.schema(schema)
.parquet(S3path)
val acctHistDFPersisted = acctHistDF.persist()
acctHistDFPersisted.createOrReplaceTempView("acctHist")

第 2 步(使用流数据刷新历史数据帧):-

val history = sparkSession.table("acctHist")
history.unionByName(stream)
history.createOrReplaceTempView("acctHist")

谢谢 斯里