虚拟文件集列和行集变量 U-SQL

virtual file set column and rowset variable U-SQL

我在数据工厂中安排作业时遇到问题。 我正在尝试每小时处理一个计划的作业,该作业将在不同条件下每小时执行相同的脚本。

假设我有一堆 Avro 文件在 Azure Data Lake Store 中传播,模式如下。 /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}{date:MM}{date:dd} __{日期:H}

每小时都有新文件添加到 Data Lake Store。 为了仅在我决定借助 U-SQL 虚拟文件集列和我在 Data Lake Store 中创建的一些 SyncTable 来处理它们时才处理这些文件。

我的查询如下所示。

DECLARE @file_set_path string = /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H};
@result = EXTRACT [Id] long,
....
date DateTime
FROM @file_set_path 
USING someextractor;

@rdate =
    SELECT MAX(ProcessedDate) AS ProcessedDate
    FROM dbo.SyncTable 
    WHERE EntityName== "SomeEntity";

@finalResult = SELECT [Id],... FROM @result
CROSS JOIN @rdate AS r
WHERE date >= r.ProcessedDate;

因为我不能在 where 子句中使用 rowset 变量,所以我将单行与 set 交叉连接,但是即使在这种情况下,U-SQL 也找不到正确的文件并且总是 return所有文件设置。

是否有任何解决方法或其他方法?

我认为这种方法应该行得通,除非某处不太正确,即您可以确认 dbo.SyncTable table 的数据类型吗?转储 @rdate 并确保您获得的值符合您的预期。

我整理了一个按预期运行的简单演示。我的 SyncTable 副本有一条值为 01/01/2018 的记录:

@working = 
    SELECT *
    FROM (
        VALUES
            ( (int)1, DateTime.Parse("2017/12/31") ), 
            ( (int)2, DateTime.Parse("2018/01/01") ),
            ( (int)3, DateTime.Parse("2018/02/01") )
     ) AS x ( id, someDate );


@rdate =
    SELECT MAX(ProcessedDate) AS maxDate
    FROM dbo.SyncTable;

//@output =
//    SELECT *
//    FROM @rdate;

@output =
    SELECT *, (w.someDate - r.maxDate).ToString() AS diff
    FROM @working AS w
         CROSS JOIN
             @rdate AS r
    WHERE w.someDate >= r.maxDate;


OUTPUT @output TO "/output/output.csv"
USING Outputters.Csv();

我确实尝试过使用文件路径(完整脚本 here)。要记住的是自定义日期格式 H 将小时表示为从 0 到 23 的数字。如果您的 SyncTable 日期在插入时没有时间组件,它将默认为午夜 (0),这意味着整个天将被收集。根据您的模式,您的文件结构应如下所示:

"D:\Data Lake\USQLDataRoot\Data\SomeEntity17\SomeEntity_2017_12_31__8\test.csv"

我注意到你的文件路径在第二部分有下划线,在小时部分之前有一个双下划线(介于 0 和 23 之间,一位数到 10 小时)。我注意到您的文件集路径没有文件类型或引号 - 我在测试中使用了 test.csv。我的结果:

基本上我认为这种方法会奏效,但有些地方不太对劲,可能在你的文件结构、SyncTable 中的值、数据类型等方面。你需要检查细节,将中间值转储到检查直到找到问题。

wBob 的完整脚本的要点不能解决您的问题吗?这是 wBob 的完整脚本的一个经过轻微编辑的版本,用于解决您提出的一些问题:

  1. 能够过滤 SyncTable,

  2. 模式的最后一部分是文件名而不是文件夹。示例文件和结构:\Data\SomeEntity18\SomeEntity_2018_01_01__1

DECLARE @file_set_path string = @"/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H}";

@input =
EXTRACT [Id] long,
        date DateTime
FROM @file_set_path
USING Extractors.Text();

// in lieu of creating actual table
@syncTable = 
SELECT * FROM 
    ( VALUES
    ( "SomeEntity",     new DateTime(2018,01,01,01,00,00) ), 
    ( "AnotherEntity",  new DateTime(2018,01,01,01,00,00) ), 
    ( "SomeEntity",     new DateTime(2018,01,01,00,00,00) ), 
    ( "AnotherEntity",  new DateTime(2018,01,01,00,00,00) ), 
    ( "SomeEntity",     new DateTime(2017,12,31,23,00,00) ), 
    ( "AnotherEntity",  new DateTime(2017,12,31,23,00,00) )
    ) AS x ( EntityName, ProcessedDate );

@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM @syncTable
WHERE EntityName== "SomeEntity";

@output =
SELECT *,
       date.ToString() AS dateString
FROM @input AS i
     CROSS JOIN
         @rdate AS r
WHERE i.date >= r.maxDate;


OUTPUT @output 
TO "/output/output.txt"
ORDER BY Id
USING Outputters.Text(quoting:false);

另请注意,文件集无法对动态连接执行分区消除,因为优化器在准备阶段不知道这些值。

我建议将同步点作为参数从 ADF 传递到处理脚本。然后该值为优化器所知,文件集分区消除将开始。在最坏的情况下,您将不得不从上一个脚本中的同步 table 中读取该值,并将其用作下一个脚本中的参数.