部分更新 Dataframe 中记录的有效方法
Efficient way to partially update a record in Dataframe
我有一个在快照中累积批处理数据的系统。
批次中的每条记录都包含一个 unique_id 和一个版本以及多个其他列。
以前,只要在新批次中 unique_id 的版本大于快照中存在的版本,系统就会用来替换整个记录并重写为新记录。这通常是基于版本的两个数据框的合并。
例如:
Snapshot: <Uid> <Version> <col1> <col2>
-----------------
A1 | 1 | ab | cd
A2 | 1 | ef | gh
New Batch: <Uid> <Version> <col1>
------------------
A3 | 1 | gh
A1 | 2 | hh
See here col2 is absent in the new batch
After Merge It will become,
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | Null
A2 | 1 | ef | gh
这里的问题是,即使 col2
的数据不是 Uid
A2 的数据;合并后,该列将替换为空值。所以该列的旧值丢失了。
现在,我只想替换数据所在的列
即预期输出
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | cd
A2 | 1 | ef | gh
See the A1
unique id the col2 value is intact.
尽管如果批次的 A1 记录为
New Batch: <Uid> <Version> <col1> <col2>
------------------
A1 | 2 | hh | uu
输出将是
------------------
A1 | 2 | hh | uu
A2 | 1 | ef | gh
Here the entire record of A2 is replaced.
根据当前系统,我使用 spark 并将数据存储为 parquet。我可以调整 Merge
过程以合并此更改
但是,我想知道这是否是为这些用例存储数据的最佳过程。
我正在评估 Hbase
和 Hive ORC
以及我可以对合并过程进行的可能更改。
任何建议将不胜感激。
据我了解,你需要在snapshot和journal(delta)之间使用full outer join然后使用coalesce
,例如:
def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
val joinExpr = joinColumnNames
.map(column => snapshot(column) === journal(column))
.reduceLeft(_ && _)
val isThereNoJournalRecord = joinColumnNames
.map(jCol => journal(jCol).isNull)
.reduceLeft(_ && _)
val selectClause = snapshot.columns
.map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
snapshot
.join(journal, joinExpr, "full_outer")
.select(selectClause: _*)
}
在这种情况下,如果日志具有空值,您将合并快照与日志并回退到快照值。
希望对您有所帮助!
我有一个在快照中累积批处理数据的系统。
批次中的每条记录都包含一个 unique_id 和一个版本以及多个其他列。
以前,只要在新批次中 unique_id 的版本大于快照中存在的版本,系统就会用来替换整个记录并重写为新记录。这通常是基于版本的两个数据框的合并。
例如:
Snapshot: <Uid> <Version> <col1> <col2>
-----------------
A1 | 1 | ab | cd
A2 | 1 | ef | gh
New Batch: <Uid> <Version> <col1>
------------------
A3 | 1 | gh
A1 | 2 | hh
See here col2 is absent in the new batch
After Merge It will become,
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | Null
A2 | 1 | ef | gh
这里的问题是,即使 col2
的数据不是 Uid
A2 的数据;合并后,该列将替换为空值。所以该列的旧值丢失了。
现在,我只想替换数据所在的列
即预期输出
<Uid> <Version> <col1> <col2>
------------------
A3 | 1 | gh | Null
A1 | 2 | hh | cd
A2 | 1 | ef | gh
See the
A1
unique id the col2 value is intact.
尽管如果批次的 A1 记录为
New Batch: <Uid> <Version> <col1> <col2>
------------------
A1 | 2 | hh | uu
输出将是 ------------------
A1 | 2 | hh | uu
A2 | 1 | ef | gh
Here the entire record of A2 is replaced.
根据当前系统,我使用 spark 并将数据存储为 parquet。我可以调整 Merge
过程以合并此更改
但是,我想知道这是否是为这些用例存储数据的最佳过程。
我正在评估 Hbase
和 Hive ORC
以及我可以对合并过程进行的可能更改。
任何建议将不胜感激。
据我了解,你需要在snapshot和journal(delta)之间使用full outer join然后使用coalesce
,例如:
def applyDeduplicatedJournal(snapshot: DataFrame, journal: DataFrame, joinColumnNames: Seq[String]): DataFrame = {
val joinExpr = joinColumnNames
.map(column => snapshot(column) === journal(column))
.reduceLeft(_ && _)
val isThereNoJournalRecord = joinColumnNames
.map(jCol => journal(jCol).isNull)
.reduceLeft(_ && _)
val selectClause = snapshot.columns
.map(col => when(isThereNoJournalRecord, snapshot(col)).otherwise(coalesce(journal(col), snapshot(col))) as col)
snapshot
.join(journal, joinExpr, "full_outer")
.select(selectClause: _*)
}
在这种情况下,如果日志具有空值,您将合并快照与日志并回退到快照值。
希望对您有所帮助!