如何在 Azure DataLake 中合并基础和多个增量结构化流

How to merge base and multiple delta structured streams in Azure DataLake

我有一个基本流,并且我有多个不同连续日期的增量流。我想合并它们以获得最终流。我如何在 Azure Datalake 中完成此操作。例如,假设这些是流。我需要合并这些流以获得最终流。合并会将原始值替换为新值。目前delta流的数量已经超过100个

基本流:

1022918 300.00  300.00  2   7   5   100
1022918 400.00  400.00  2   170 5   100
1022919 1000.00 1000.00 2   7   6   100
1022920 2000.00 2000.00 2   170 6   100
1022921 3000.00 3000.00 2   123 7   100
1022922 100.00  100.00  2   162 7   100
1022922 200.00  200.00  2   123 9   100
1022922 300.00  300.00  2   162 9   100

增量流 1:

1022918 400.00  300.00  2   7   5   100
1022919 2000.00 1000.00 2   7   6   100
1022920 3000.00 2000.00 2   170 6   100
1022922 400.00  300.00  2   162 9   100

增量流 2:

1022919 2500.00 1000.00 2   7   6   100
1022920 3500.00 2000.00 2   170 6   100

预期输出

1022918 400.00  300.00  2   7   5   100
1022918 400.00  400.00  2   170 5   100
1022919 2500.00 1000.00 2   7   6   100
1022920 3500.00 2000.00 2   170 6   100
1022921 3000.00 3000.00 2   123 7   100
1022922 100.00  100.00  2   162 7   100
1022922 200.00  200.00  2   123 9   100
1022922 400.00  300.00  2   162 9   100

Azure Data Lake Store 是一个仅附加文件系统。这意味着记录只能添加到文件的末尾。这与许多其他仅附加系统没有太大不同。 Azure Data Lake Analytics 和 Hive 等各种分析应用程序可用于逻辑合并这些基本流和增量流

实现这一点通常需要 4 个步骤。

  1. 加载基础数据 - 这很简单。加载基础数据时,您需要添加标识符。例如基准日期或基准版本。对于此讨论,假设您使用版本号。假设基础数据是版本 0

  2. 自己加载增量数据table/file - 加载它们时,您还需要有一个标识符,可用于与基础数据进行比较以确定最新记录。假设我们也有版本号。所以新的记录会是一个更大的版本号1、2、3等等。

  3. 构建合并视图 - 这是一个将基础数据与版本最大的 ID 上的增量数据连接起来的查询。现在当你得到这个视图时,这是所有最新的记录。

  4. 使用合并视图中的记录创建新的基础数据。

这里有一篇文章解释了如何使用 Hive 来实现这一点。这个使用datetime来识别最新的记录。

https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/

我已经使用您的测试数据在 U-SQL 中编写了一个示例。它采用上述方法,并根据您描述的唯一键和某种数据组件获取最后一条记录。

我使用了这些文件:

有几个假设:

  • 文件是 space 分隔的
  • 文件名(或可能是文件夹)包含日期部分

代码:

// Assumptions: 
//      col1, 5 and 6 is the intented unique key
//      each file includes a date in the filename

DECLARE @baseFilesLocation string = "input/base_{filedate}.txt";
DECLARE @deltaFilesLocation string = "input/delta_{filedate}.txt";


// Get the base files
@baseFiles =
    EXTRACT col1 int,
            col2 decimal,
            col3 decimal,
            col4 int,
            col5 int,
            col6 int,
            col7 int,
            filedate string

    FROM @baseFilesLocation
    USING Extractors.Text(delimiter : ' ');


// Get the delta files
@deltaFiles =
    EXTRACT col1 int,
            col2 decimal,
            col3 decimal,
            col4 int,
            col5 int,
            col6 int,
            col7 int,
            filedate string

    FROM @deltaFilesLocation
    USING Extractors.Text(delimiter : ' ', silent: true);


@working = 
    SELECT *
    FROM @baseFiles
    UNION ALL
    SELECT *
    FROM @deltaFiles;


// Work out the (col1, 5 and 6) and max filedate combination
@maxDates = 
    SELECT col1, col5, col6, MAX(filedate) AS filedate
    FROM @working
    GROUP BY col1, col5, col6;


// Join the original set (all base data, all delta files) to the max dates rowset, to get last record for each col1
@output = SELECT w.*
    FROM @working AS w
        SEMIJOIN @maxDates AS d ON w.col1 == d.col1
            AND w.col5 == d.col5
            AND w.col6 == d.col6
            AND w.filedate == d.filedate;


OUTPUT @output
    TO "/output/output.csv"
ORDER BY col1, col2, col3
USING Outputters.Csv();

我的结果(符合您的预期结果:

HTH