如何在 Azure DataLake 中合并基础和多个增量结构化流
How to merge base and multiple delta structured streams in Azure DataLake
我有一个基本流,并且我有多个不同连续日期的增量流。我想合并它们以获得最终流。我如何在 Azure Datalake 中完成此操作。例如,假设这些是流。我需要合并这些流以获得最终流。合并会将原始值替换为新值。目前delta流的数量已经超过100个
基本流:
1022918 300.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 1000.00 1000.00 2 7 6 100
1022920 2000.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 300.00 300.00 2 162 9 100
增量流 1:
1022918 400.00 300.00 2 7 5 100
1022919 2000.00 1000.00 2 7 6 100
1022920 3000.00 2000.00 2 170 6 100
1022922 400.00 300.00 2 162 9 100
增量流 2:
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
预期输出
1022918 400.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 400.00 300.00 2 162 9 100
Azure Data Lake Store 是一个仅附加文件系统。这意味着记录只能添加到文件的末尾。这与许多其他仅附加系统没有太大不同。 Azure Data Lake Analytics 和 Hive 等各种分析应用程序可用于逻辑合并这些基本流和增量流
实现这一点通常需要 4 个步骤。
加载基础数据 - 这很简单。加载基础数据时,您需要添加标识符。例如基准日期或基准版本。对于此讨论,假设您使用版本号。假设基础数据是版本 0
自己加载增量数据table/file - 加载它们时,您还需要有一个标识符,可用于与基础数据进行比较以确定最新记录。假设我们也有版本号。所以新的记录会是一个更大的版本号1、2、3等等。
构建合并视图 - 这是一个将基础数据与版本最大的 ID 上的增量数据连接起来的查询。现在当你得到这个视图时,这是所有最新的记录。
使用合并视图中的记录创建新的基础数据。
这里有一篇文章解释了如何使用 Hive 来实现这一点。这个使用datetime来识别最新的记录。
https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
我已经使用您的测试数据在 U-SQL
中编写了一个示例。它采用上述方法,并根据您描述的唯一键和某种数据组件获取最后一条记录。
我使用了这些文件:
有几个假设:
- 文件是 space 分隔的
- 文件名(或可能是文件夹)包含日期部分
代码:
// Assumptions:
// col1, 5 and 6 is the intented unique key
// each file includes a date in the filename
DECLARE @baseFilesLocation string = "input/base_{filedate}.txt";
DECLARE @deltaFilesLocation string = "input/delta_{filedate}.txt";
// Get the base files
@baseFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @baseFilesLocation
USING Extractors.Text(delimiter : ' ');
// Get the delta files
@deltaFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @deltaFilesLocation
USING Extractors.Text(delimiter : ' ', silent: true);
@working =
SELECT *
FROM @baseFiles
UNION ALL
SELECT *
FROM @deltaFiles;
// Work out the (col1, 5 and 6) and max filedate combination
@maxDates =
SELECT col1, col5, col6, MAX(filedate) AS filedate
FROM @working
GROUP BY col1, col5, col6;
// Join the original set (all base data, all delta files) to the max dates rowset, to get last record for each col1
@output = SELECT w.*
FROM @working AS w
SEMIJOIN @maxDates AS d ON w.col1 == d.col1
AND w.col5 == d.col5
AND w.col6 == d.col6
AND w.filedate == d.filedate;
OUTPUT @output
TO "/output/output.csv"
ORDER BY col1, col2, col3
USING Outputters.Csv();
我的结果(符合您的预期结果:
HTH
我有一个基本流,并且我有多个不同连续日期的增量流。我想合并它们以获得最终流。我如何在 Azure Datalake 中完成此操作。例如,假设这些是流。我需要合并这些流以获得最终流。合并会将原始值替换为新值。目前delta流的数量已经超过100个
基本流:
1022918 300.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 1000.00 1000.00 2 7 6 100
1022920 2000.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 300.00 300.00 2 162 9 100
增量流 1:
1022918 400.00 300.00 2 7 5 100
1022919 2000.00 1000.00 2 7 6 100
1022920 3000.00 2000.00 2 170 6 100
1022922 400.00 300.00 2 162 9 100
增量流 2:
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
预期输出
1022918 400.00 300.00 2 7 5 100
1022918 400.00 400.00 2 170 5 100
1022919 2500.00 1000.00 2 7 6 100
1022920 3500.00 2000.00 2 170 6 100
1022921 3000.00 3000.00 2 123 7 100
1022922 100.00 100.00 2 162 7 100
1022922 200.00 200.00 2 123 9 100
1022922 400.00 300.00 2 162 9 100
Azure Data Lake Store 是一个仅附加文件系统。这意味着记录只能添加到文件的末尾。这与许多其他仅附加系统没有太大不同。 Azure Data Lake Analytics 和 Hive 等各种分析应用程序可用于逻辑合并这些基本流和增量流
实现这一点通常需要 4 个步骤。
加载基础数据 - 这很简单。加载基础数据时,您需要添加标识符。例如基准日期或基准版本。对于此讨论,假设您使用版本号。假设基础数据是版本 0
自己加载增量数据table/file - 加载它们时,您还需要有一个标识符,可用于与基础数据进行比较以确定最新记录。假设我们也有版本号。所以新的记录会是一个更大的版本号1、2、3等等。
构建合并视图 - 这是一个将基础数据与版本最大的 ID 上的增量数据连接起来的查询。现在当你得到这个视图时,这是所有最新的记录。
使用合并视图中的记录创建新的基础数据。
这里有一篇文章解释了如何使用 Hive 来实现这一点。这个使用datetime来识别最新的记录。
https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
我已经使用您的测试数据在 U-SQL
中编写了一个示例。它采用上述方法,并根据您描述的唯一键和某种数据组件获取最后一条记录。
我使用了这些文件:
有几个假设:
- 文件是 space 分隔的
- 文件名(或可能是文件夹)包含日期部分
代码:
// Assumptions:
// col1, 5 and 6 is the intented unique key
// each file includes a date in the filename
DECLARE @baseFilesLocation string = "input/base_{filedate}.txt";
DECLARE @deltaFilesLocation string = "input/delta_{filedate}.txt";
// Get the base files
@baseFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @baseFilesLocation
USING Extractors.Text(delimiter : ' ');
// Get the delta files
@deltaFiles =
EXTRACT col1 int,
col2 decimal,
col3 decimal,
col4 int,
col5 int,
col6 int,
col7 int,
filedate string
FROM @deltaFilesLocation
USING Extractors.Text(delimiter : ' ', silent: true);
@working =
SELECT *
FROM @baseFiles
UNION ALL
SELECT *
FROM @deltaFiles;
// Work out the (col1, 5 and 6) and max filedate combination
@maxDates =
SELECT col1, col5, col6, MAX(filedate) AS filedate
FROM @working
GROUP BY col1, col5, col6;
// Join the original set (all base data, all delta files) to the max dates rowset, to get last record for each col1
@output = SELECT w.*
FROM @working AS w
SEMIJOIN @maxDates AS d ON w.col1 == d.col1
AND w.col5 == d.col5
AND w.col6 == d.col6
AND w.filedate == d.filedate;
OUTPUT @output
TO "/output/output.csv"
ORDER BY col1, col2, col3
USING Outputters.Csv();
我的结果(符合您的预期结果:
HTH