使用 Databricks/Spark 从包含一个 ID 的多个更新的源文件创建 SCD2 table
Create SCD2 table from sourcefile that contains multiple updates for one id using Databricks/Spark
我想在数据块中创建一个缓慢变化的维度。我的源数据框包含以下信息。
+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately | date | deviceid | patchguid | status |
+-------------------+-------------------------+----------+-----------+-------------+
| False | 2018-08-15 04:01:00.000 | 123 | 00-001 | Install |
| True | 2018-08-16 00:00:00.000 | 123 | 00-001 | Install |
| False | 2018-08-10 01:00:00.000 | 123 | 00-001 | Not Approved|
| False | 2020-01-01 00:00:00.000 | 333 | 11-111 | Declined |
+-------------------+-------------------------+----------+-----------+-------------+
我想要作为输出的数据框如下所示:
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey | deviceid | patchguid | status | actionimmediately | starttime | endtime | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 | 123 | 00-001 | Not Approved | False | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | False | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | True | 2018-08-16 00:00:00.000 | null | True |
| 33311-111 | 333 | 11-111 | Declined | False | 2020-01-01 00:00:00.000 | null | True |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
实际上源文件包含 275475 行
我已经尝试了 2 个解决方案,但都执行得很慢。喜欢+-10h。
解决方案 1:使用 Delta Lake 合并
首先,我创建了一个 seqId,稍后用于迭代。这是因为合并不能多次更新同一行。我正在使用 window.
创建 seqId
source_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))
然后我创建了一个 for 循环,它遍历每个 seqId 并合并行。
实际上,max_seq_id 是 1900
def createTable (df, SeqId):
df\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.where(col('seqid') == SeqId)\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))\
.write.format('delta')\
.partitionBy("current")\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
def MergePatchApproval (df,deltatable,seqNummer):
dataframe = df.where(col('seqid') == seqNummer)
newToInsert = dataframe.alias('updates')\
.join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
.select(\
'updates.actionimmediately',\
'updates.date',\
'updates.deviceid',\
'updates.patchguid',\
'updates.status',\
'updates.seqid')\
.where('table.current = true and \
(table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')
stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
.union(dataframe\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'actionimmediately',\
'date',\
'deviceid',\
'patchguid',\
'status',\
'seqid')))
deltatable.alias('t')\
.merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
.whenMatchedUpdate(condition = 't.current = true and \
(t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
set = {
'endtime':'s.date',
'current':'false'
}).whenNotMatchedInsert(values = {
'mergekey':'s.mergekey',
'deviceid':'s.deviceid',
'patchguid':'s.patchguid',
'status':'s.status',
'actionimmediately':'s.actionimmediately',
'starttime':'s.date',
'endtime':'NULL',
'current':'true'
}).execute()
for i in range(max_seq_id):
i = i + 1
print(i)
df = source_df.where(col('seqid') == i)
if(i == 1):
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
createTable(df,i)
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
else:
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
MergePatchApproval(df,approval_table,i)
else:
MergePatchApproval(df,approval_table,i)
我在这个解决方案中遇到的问题是,将数据写入 Azure 数据湖需要一些时间,我认为这很正常,但每次迭代的执行时间也在增加。
方案二:upsert数据帧,最后写入一次
在此解决方案中,我还使用了 for 循环和 seqId,但我只在最后才将每个循环写入 azure data lake,而不是将其写入。
该方案解决了写入延迟问题,但每次循环结束的时间仍在增加。
def createDestDF(sourceDF):
dest_df = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
return dest_df
def getChangedRecords(sourceDF,destDF):
changedRecords = sourceDF.alias('u')\
.join(destDF.alias('t'),['deviceid','patchguid'])\
.select(\
'u.actionimmediately',\
'u.date',\
'u.deviceid',\
'u.patchguid',\
'u.status',\
'u.seqid',\
'u.mergekey')\
.where('t.current = true and \
(t.status <> u.status or t.actionimmediately <> u.actionimmediately)')
return changedRecords
def getNewRecords(sourceDF,destDF):
newRecords = sourceDF.alias('n')\
.join(destDF.alias('t'),['deviceid','patchguid'],'left')\
.select(\
't.mergekey',\
'n.actionimmediately',\
'n.date',\
'deviceid',\
'patchguid',\
'n.status',\
'n.seqid')\
.where('t.current is null')
return newRecords
def upsertChangedRecords(sourceDF,destDF):
endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
currentColumn = expr("""IF(date IS NULL, currentOld, False)""")
updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
'mergekey',\
't.deviceid',\
't.patchguid',\
't.status',\
't.actionimmediately',\
't.starttime',\
's.date',\
col('t.current').alias('currentOld'),\
col('t.endTime').alias('endtimeOld'))\
.withColumn('endtime',endTimeColumn)\
.withColumn('current',currentColumn)\
.drop('currentOld','date','endTimeOld')
updateInsertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = updateDF.union(updateInsertDF)
return resultDF
def insertNewRecords(sourceDF, destDF):
insertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = destDF.union(insertDF)
return resultDF
for i in range(max_seq_id):
i = i + 1
print(i)
seq_df = source_df.where(col('seqid') == i)
if i == 1:
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
dest_df = createDestDF(seq_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
知道如何解决 for 循环中不断增加的执行时间吗?
亲切的问候,
据我了解,随着时间的推移,行不会从您的源 table 中消失 - 如果是这样,您的问题可以通过将您的 spark 数据框放入临时视图并针对它编写查询来解决:
df.createOrReplaceTempView("source")
df_scd = spark.sql("""
WITH stage AS (
SELECT *,
LEAD(date,1) OVER (PARTITION BY deviceid, patchguid ORDER BY date) AS next_date
FROM source
)
SELECT
concat(deviceid, patchguid) as mergekey
,deviceid
,patchguid
,status
,actionimmediately
,date AS starttime
,next_date AS endtime
,CASE WHEN next_date IS NULL THEN True ELSE False END AS current
FROM stage
""")
它应该非常快并且会产生您想要的准确输出。我检查了你的示例数据,然后 df_scd 显示:
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
| mergekey|deviceid|patchguid| status|actionimmediately| starttime| endtime|current|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
|12300-001| 123| 00-001|Not Approved| False|2018-08-10 01:00:...|2018-08-15 04:01:...| false|
|12300-001| 123| 00-001| Install| False|2018-08-15 04:01:...|2018-08-16 00:00:...| false|
|12300-001| 123| 00-001| Install| True|2018-08-16 00:00:...| null| true|
|33311-111| 333| 11-111| Declined| False|2020-01-01 00:00:...| null| true|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
我想在数据块中创建一个缓慢变化的维度。我的源数据框包含以下信息。
+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately | date | deviceid | patchguid | status |
+-------------------+-------------------------+----------+-----------+-------------+
| False | 2018-08-15 04:01:00.000 | 123 | 00-001 | Install |
| True | 2018-08-16 00:00:00.000 | 123 | 00-001 | Install |
| False | 2018-08-10 01:00:00.000 | 123 | 00-001 | Not Approved|
| False | 2020-01-01 00:00:00.000 | 333 | 11-111 | Declined |
+-------------------+-------------------------+----------+-----------+-------------+
我想要作为输出的数据框如下所示:
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey | deviceid | patchguid | status | actionimmediately | starttime | endtime | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 | 123 | 00-001 | Not Approved | False | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | False | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | True | 2018-08-16 00:00:00.000 | null | True |
| 33311-111 | 333 | 11-111 | Declined | False | 2020-01-01 00:00:00.000 | null | True |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
实际上源文件包含 275475 行
我已经尝试了 2 个解决方案,但都执行得很慢。喜欢+-10h。
解决方案 1:使用 Delta Lake 合并
首先,我创建了一个 seqId,稍后用于迭代。这是因为合并不能多次更新同一行。我正在使用 window.
创建 seqIdsource_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))
然后我创建了一个 for 循环,它遍历每个 seqId 并合并行。 实际上,max_seq_id 是 1900
def createTable (df, SeqId):
df\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.where(col('seqid') == SeqId)\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))\
.write.format('delta')\
.partitionBy("current")\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
def MergePatchApproval (df,deltatable,seqNummer):
dataframe = df.where(col('seqid') == seqNummer)
newToInsert = dataframe.alias('updates')\
.join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
.select(\
'updates.actionimmediately',\
'updates.date',\
'updates.deviceid',\
'updates.patchguid',\
'updates.status',\
'updates.seqid')\
.where('table.current = true and \
(table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')
stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
.union(dataframe\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'actionimmediately',\
'date',\
'deviceid',\
'patchguid',\
'status',\
'seqid')))
deltatable.alias('t')\
.merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
.whenMatchedUpdate(condition = 't.current = true and \
(t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
set = {
'endtime':'s.date',
'current':'false'
}).whenNotMatchedInsert(values = {
'mergekey':'s.mergekey',
'deviceid':'s.deviceid',
'patchguid':'s.patchguid',
'status':'s.status',
'actionimmediately':'s.actionimmediately',
'starttime':'s.date',
'endtime':'NULL',
'current':'true'
}).execute()
for i in range(max_seq_id):
i = i + 1
print(i)
df = source_df.where(col('seqid') == i)
if(i == 1):
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
createTable(df,i)
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
else:
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
MergePatchApproval(df,approval_table,i)
else:
MergePatchApproval(df,approval_table,i)
我在这个解决方案中遇到的问题是,将数据写入 Azure 数据湖需要一些时间,我认为这很正常,但每次迭代的执行时间也在增加。
方案二:upsert数据帧,最后写入一次
在此解决方案中,我还使用了 for 循环和 seqId,但我只在最后才将每个循环写入 azure data lake,而不是将其写入。 该方案解决了写入延迟问题,但每次循环结束的时间仍在增加。
def createDestDF(sourceDF):
dest_df = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
return dest_df
def getChangedRecords(sourceDF,destDF):
changedRecords = sourceDF.alias('u')\
.join(destDF.alias('t'),['deviceid','patchguid'])\
.select(\
'u.actionimmediately',\
'u.date',\
'u.deviceid',\
'u.patchguid',\
'u.status',\
'u.seqid',\
'u.mergekey')\
.where('t.current = true and \
(t.status <> u.status or t.actionimmediately <> u.actionimmediately)')
return changedRecords
def getNewRecords(sourceDF,destDF):
newRecords = sourceDF.alias('n')\
.join(destDF.alias('t'),['deviceid','patchguid'],'left')\
.select(\
't.mergekey',\
'n.actionimmediately',\
'n.date',\
'deviceid',\
'patchguid',\
'n.status',\
'n.seqid')\
.where('t.current is null')
return newRecords
def upsertChangedRecords(sourceDF,destDF):
endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
currentColumn = expr("""IF(date IS NULL, currentOld, False)""")
updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
'mergekey',\
't.deviceid',\
't.patchguid',\
't.status',\
't.actionimmediately',\
't.starttime',\
's.date',\
col('t.current').alias('currentOld'),\
col('t.endTime').alias('endtimeOld'))\
.withColumn('endtime',endTimeColumn)\
.withColumn('current',currentColumn)\
.drop('currentOld','date','endTimeOld')
updateInsertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = updateDF.union(updateInsertDF)
return resultDF
def insertNewRecords(sourceDF, destDF):
insertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = destDF.union(insertDF)
return resultDF
for i in range(max_seq_id):
i = i + 1
print(i)
seq_df = source_df.where(col('seqid') == i)
if i == 1:
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
dest_df = createDestDF(seq_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
知道如何解决 for 循环中不断增加的执行时间吗?
亲切的问候,
据我了解,随着时间的推移,行不会从您的源 table 中消失 - 如果是这样,您的问题可以通过将您的 spark 数据框放入临时视图并针对它编写查询来解决:
df.createOrReplaceTempView("source")
df_scd = spark.sql("""
WITH stage AS (
SELECT *,
LEAD(date,1) OVER (PARTITION BY deviceid, patchguid ORDER BY date) AS next_date
FROM source
)
SELECT
concat(deviceid, patchguid) as mergekey
,deviceid
,patchguid
,status
,actionimmediately
,date AS starttime
,next_date AS endtime
,CASE WHEN next_date IS NULL THEN True ELSE False END AS current
FROM stage
""")
它应该非常快并且会产生您想要的准确输出。我检查了你的示例数据,然后 df_scd 显示:
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
| mergekey|deviceid|patchguid| status|actionimmediately| starttime| endtime|current|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
|12300-001| 123| 00-001|Not Approved| False|2018-08-10 01:00:...|2018-08-15 04:01:...| false|
|12300-001| 123| 00-001| Install| False|2018-08-15 04:01:...|2018-08-16 00:00:...| false|
|12300-001| 123| 00-001| Install| True|2018-08-16 00:00:...| null| true|
|33311-111| 333| 11-111| Declined| False|2020-01-01 00:00:...| null| true|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+