虚拟文件集列和行集变量 U-SQL
virtual file set column and rowset variable U-SQL
我在数据工厂中安排作业时遇到问题。
我正在尝试每小时处理一个计划的作业,该作业将在不同条件下每小时执行相同的脚本。
假设我有一堆 Avro 文件在 Azure Data Lake Store 中传播,模式如下。
/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}{date:MM}{date:dd} __{日期:H}
每小时都有新文件添加到 Data Lake Store。
为了仅在我决定借助 U-SQL 虚拟文件集列和我在 Data Lake Store 中创建的一些 SyncTable 来处理它们时才处理这些文件。
我的查询如下所示。
DECLARE @file_set_path string = /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H};
@result = EXTRACT [Id] long,
....
date DateTime
FROM @file_set_path
USING someextractor;
@rdate =
SELECT MAX(ProcessedDate) AS ProcessedDate
FROM dbo.SyncTable
WHERE EntityName== "SomeEntity";
@finalResult = SELECT [Id],... FROM @result
CROSS JOIN @rdate AS r
WHERE date >= r.ProcessedDate;
因为我不能在 where 子句中使用 rowset 变量,所以我将单行与 set 交叉连接,但是即使在这种情况下,U-SQL 也找不到正确的文件并且总是 return所有文件设置。
是否有任何解决方法或其他方法?
我认为这种方法应该行得通,除非某处不太正确,即您可以确认 dbo.SyncTable
table 的数据类型吗?转储 @rdate
并确保您获得的值符合您的预期。
我整理了一个按预期运行的简单演示。我的 SyncTable 副本有一条值为 01/01/2018 的记录:
@working =
SELECT *
FROM (
VALUES
( (int)1, DateTime.Parse("2017/12/31") ),
( (int)2, DateTime.Parse("2018/01/01") ),
( (int)3, DateTime.Parse("2018/02/01") )
) AS x ( id, someDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM dbo.SyncTable;
//@output =
// SELECT *
// FROM @rdate;
@output =
SELECT *, (w.someDate - r.maxDate).ToString() AS diff
FROM @working AS w
CROSS JOIN
@rdate AS r
WHERE w.someDate >= r.maxDate;
OUTPUT @output TO "/output/output.csv"
USING Outputters.Csv();
我确实尝试过使用文件路径(完整脚本 here)。要记住的是自定义日期格式 H 将小时表示为从 0 到 23 的数字。如果您的 SyncTable 日期在插入时没有时间组件,它将默认为午夜 (0),这意味着整个天将被收集。根据您的模式,您的文件结构应如下所示:
"D:\Data Lake\USQLDataRoot\Data\SomeEntity17\SomeEntity_2017_12_31__8\test.csv"
我注意到你的文件路径在第二部分有下划线,在小时部分之前有一个双下划线(介于 0 和 23 之间,一位数到 10 小时)。我注意到您的文件集路径没有文件类型或引号 - 我在测试中使用了 test.csv
。我的结果:
基本上我认为这种方法会奏效,但有些地方不太对劲,可能在你的文件结构、SyncTable 中的值、数据类型等方面。你需要检查细节,将中间值转储到检查直到找到问题。
wBob 的完整脚本的要点不能解决您的问题吗?这是 wBob 的完整脚本的一个经过轻微编辑的版本,用于解决您提出的一些问题:
能够过滤 SyncTable,
模式的最后一部分是文件名而不是文件夹。示例文件和结构:\Data\SomeEntity18\SomeEntity_2018_01_01__1
DECLARE @file_set_path string = @"/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H}";
@input =
EXTRACT [Id] long,
date DateTime
FROM @file_set_path
USING Extractors.Text();
// in lieu of creating actual table
@syncTable =
SELECT * FROM
( VALUES
( "SomeEntity", new DateTime(2018,01,01,01,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,01,00,00) ),
( "SomeEntity", new DateTime(2018,01,01,00,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,00,00,00) ),
( "SomeEntity", new DateTime(2017,12,31,23,00,00) ),
( "AnotherEntity", new DateTime(2017,12,31,23,00,00) )
) AS x ( EntityName, ProcessedDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM @syncTable
WHERE EntityName== "SomeEntity";
@output =
SELECT *,
date.ToString() AS dateString
FROM @input AS i
CROSS JOIN
@rdate AS r
WHERE i.date >= r.maxDate;
OUTPUT @output
TO "/output/output.txt"
ORDER BY Id
USING Outputters.Text(quoting:false);
另请注意,文件集无法对动态连接执行分区消除,因为优化器在准备阶段不知道这些值。
我建议将同步点作为参数从 ADF 传递到处理脚本。然后该值为优化器所知,文件集分区消除将开始。在最坏的情况下,您将不得不从上一个脚本中的同步 table 中读取该值,并将其用作下一个脚本中的参数.
我在数据工厂中安排作业时遇到问题。 我正在尝试每小时处理一个计划的作业,该作业将在不同条件下每小时执行相同的脚本。
假设我有一堆 Avro 文件在 Azure Data Lake Store 中传播,模式如下。 /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}{date:MM}{date:dd} __{日期:H}
每小时都有新文件添加到 Data Lake Store。 为了仅在我决定借助 U-SQL 虚拟文件集列和我在 Data Lake Store 中创建的一些 SyncTable 来处理它们时才处理这些文件。
我的查询如下所示。
DECLARE @file_set_path string = /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H};
@result = EXTRACT [Id] long,
....
date DateTime
FROM @file_set_path
USING someextractor;
@rdate =
SELECT MAX(ProcessedDate) AS ProcessedDate
FROM dbo.SyncTable
WHERE EntityName== "SomeEntity";
@finalResult = SELECT [Id],... FROM @result
CROSS JOIN @rdate AS r
WHERE date >= r.ProcessedDate;
因为我不能在 where 子句中使用 rowset 变量,所以我将单行与 set 交叉连接,但是即使在这种情况下,U-SQL 也找不到正确的文件并且总是 return所有文件设置。
是否有任何解决方法或其他方法?
我认为这种方法应该行得通,除非某处不太正确,即您可以确认 dbo.SyncTable
table 的数据类型吗?转储 @rdate
并确保您获得的值符合您的预期。
我整理了一个按预期运行的简单演示。我的 SyncTable 副本有一条值为 01/01/2018 的记录:
@working =
SELECT *
FROM (
VALUES
( (int)1, DateTime.Parse("2017/12/31") ),
( (int)2, DateTime.Parse("2018/01/01") ),
( (int)3, DateTime.Parse("2018/02/01") )
) AS x ( id, someDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM dbo.SyncTable;
//@output =
// SELECT *
// FROM @rdate;
@output =
SELECT *, (w.someDate - r.maxDate).ToString() AS diff
FROM @working AS w
CROSS JOIN
@rdate AS r
WHERE w.someDate >= r.maxDate;
OUTPUT @output TO "/output/output.csv"
USING Outputters.Csv();
我确实尝试过使用文件路径(完整脚本 here)。要记住的是自定义日期格式 H 将小时表示为从 0 到 23 的数字。如果您的 SyncTable 日期在插入时没有时间组件,它将默认为午夜 (0),这意味着整个天将被收集。根据您的模式,您的文件结构应如下所示:
"D:\Data Lake\USQLDataRoot\Data\SomeEntity17\SomeEntity_2017_12_31__8\test.csv"
我注意到你的文件路径在第二部分有下划线,在小时部分之前有一个双下划线(介于 0 和 23 之间,一位数到 10 小时)。我注意到您的文件集路径没有文件类型或引号 - 我在测试中使用了 test.csv
。我的结果:
基本上我认为这种方法会奏效,但有些地方不太对劲,可能在你的文件结构、SyncTable 中的值、数据类型等方面。你需要检查细节,将中间值转储到检查直到找到问题。
wBob 的完整脚本的要点不能解决您的问题吗?这是 wBob 的完整脚本的一个经过轻微编辑的版本,用于解决您提出的一些问题:
能够过滤 SyncTable,
模式的最后一部分是文件名而不是文件夹。示例文件和结构:
\Data\SomeEntity18\SomeEntity_2018_01_01__1
DECLARE @file_set_path string = @"/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H}";
@input =
EXTRACT [Id] long,
date DateTime
FROM @file_set_path
USING Extractors.Text();
// in lieu of creating actual table
@syncTable =
SELECT * FROM
( VALUES
( "SomeEntity", new DateTime(2018,01,01,01,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,01,00,00) ),
( "SomeEntity", new DateTime(2018,01,01,00,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,00,00,00) ),
( "SomeEntity", new DateTime(2017,12,31,23,00,00) ),
( "AnotherEntity", new DateTime(2017,12,31,23,00,00) )
) AS x ( EntityName, ProcessedDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM @syncTable
WHERE EntityName== "SomeEntity";
@output =
SELECT *,
date.ToString() AS dateString
FROM @input AS i
CROSS JOIN
@rdate AS r
WHERE i.date >= r.maxDate;
OUTPUT @output
TO "/output/output.txt"
ORDER BY Id
USING Outputters.Text(quoting:false);
另请注意,文件集无法对动态连接执行分区消除,因为优化器在准备阶段不知道这些值。
我建议将同步点作为参数从 ADF 传递到处理脚本。然后该值为优化器所知,文件集分区消除将开始。在最坏的情况下,您将不得不从上一个脚本中的同步 table 中读取该值,并将其用作下一个脚本中的参数.