部分更新 Dataframe 中记录的有效方法

Efficient way to partially update a record in Dataframe

我有一个在快照中累积批处理数据的系统。

批次中的每条记录都包含一个 unique_id 和一个版本以及多个其他列。

以前,只要在新批次中 unique_id 的版本大于快照中存在的版本,系统就会用来替换整个记录并重写为新记录。这通常是基于版本的两个数据框的合并。

例如:

 Snapshot: <Uid>   <Version> <col1> <col2>
           -----------------
              A1  | 1     |  ab | cd
              A2  | 1     |  ef | gh

 New Batch: <Uid>  <Version> <col1> 
           ------------------
              A3  | 1     |  gh
              A1  | 2     |  hh

See here col2 is absent in the new batch

 After Merge It will become,

           <Uid>  <Version> <col1> <col2>
           ------------------
              A3  | 1     |  gh  | Null
              A1  | 2     |  hh  | Null
              A2  | 1     |  ef  | gh

这里的问题是,即使 col2 的数据不是 Uid A2 的数据;合并后,该列将替换为空值。所以该列的旧值丢失了。

现在,我只想替换数据所在的列

即预期输出

             <Uid>  <Version> <col1> <col2>
           ------------------
              A3  | 1     |  gh  | Null
              A1  | 2     |  hh  | cd
              A2  | 1     |  ef  | gh

See the A1 unique id the col2 value is intact.

尽管如果批次的 A1 记录为

New Batch: <Uid>  <Version> <col1> <col2>
           ------------------
            A1  | 2     |  hh  | uu

输出将是 ------------------

              A1  | 2     |  hh  | uu
              A2  | 1     |  ef  | gh

Here the entire record of A2 is replaced.

根据当前系统,我使用 spark 并将数据存储为 parquet。我可以调整 Merge 过程以合并此更改

但是,我想知道这是否是为这些用例存储数据的最佳过程。

我正在评估 Hbase 和 Hive ORC 以及我可以对合并过程进行的可能更改。

任何建议将不胜感激。

据我了解,你需要在snapshot和journal(delta)之间使用full outer join然后使用coalesce,例如:

  def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {

    val joinExpr = joinColumnNames
      .map(column => snapshot(column) === journal(column))
      .reduceLeft(_ && _)

    val isThereNoJournalRecord = joinColumnNames
      .map(jCol => journal(jCol).isNull)
      .reduceLeft(_ && _)

    val selectClause = snapshot.columns
      .map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)

    snapshot
      .join(journal, joinExpr, "full_outer")
      .select(selectClause: _*)
}

在这种情况下,如果日志具有空值,您将合并快照与日志并回退到快照值。

希望对您有所帮助!