从 SELECT 语句定义 EXTRACT 范围
Defining an EXTRACT range from a SELECT statement
我打算分批处理来自存储在 ADLA 中的 EventHub 的数据集。处理间隔对我来说似乎是合乎逻辑的,我的日期在我上次执行日期时间和当前执行日期时间之间。
我考虑过将执行时间戳保存在 table 中,以便我可以跟踪它,并执行以下操作:
DECLARE @my_file string = @"/data/raw/my-ns/my-eh/{date:yyyy}/{date:MM}/{date:dd}/{date:HH}/{date:mm}/{date:ss}/{*}.avro";
DECLARE @max_datetime DateTime = DateTime.Now;
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
如何将此间隔正确添加到我的 EXTRACT
查询中?我使用带有手动定义间隔的通用 WHERE 子句对其进行了测试并且它有效,但是当我尝试使用 @min_datetime
它不起作用,因为它的结果是一个行集。
我考虑过在后续查询中应用一些过滤,但恐怕这意味着 @my_json_bytes
将提取我的整个数据集并在之后对其进行过滤,从而导致查询不够优化。
提前致谢。
您应该可以在以后 SELECT
中应用过滤器。 U-SQL can 在某些条件下推高谓词,但我还没有能够测试这个。尝试这样的事情:
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
@working =
SELECT *
FROM @my_json_bytes AS j
CROSS JOIN
@min_datetime AS t
WHERE j.date > t.min_datetime;
我打算分批处理来自存储在 ADLA 中的 EventHub 的数据集。处理间隔对我来说似乎是合乎逻辑的,我的日期在我上次执行日期时间和当前执行日期时间之间。
我考虑过将执行时间戳保存在 table 中,以便我可以跟踪它,并执行以下操作:
DECLARE @my_file string = @"/data/raw/my-ns/my-eh/{date:yyyy}/{date:MM}/{date:dd}/{date:HH}/{date:mm}/{date:ss}/{*}.avro";
DECLARE @max_datetime DateTime = DateTime.Now;
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
如何将此间隔正确添加到我的 EXTRACT
查询中?我使用带有手动定义间隔的通用 WHERE 子句对其进行了测试并且它有效,但是当我尝试使用 @min_datetime
它不起作用,因为它的结果是一个行集。
我考虑过在后续查询中应用一些过滤,但恐怕这意味着 @my_json_bytes
将提取我的整个数据集并在之后对其进行过滤,从而导致查询不够优化。
提前致谢。
您应该可以在以后 SELECT
中应用过滤器。 U-SQL can 在某些条件下推高谓词,但我还没有能够测试这个。尝试这样的事情:
@min_datetime =
SELECT (DateTime) MAX(execution_datetime) AS min_datetime
FROM my_adldb.dbo.watermark;
@my_json_bytes =
EXTRACT Body byte[],
date DateTime
FROM @my_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
@working =
SELECT *
FROM @my_json_bytes AS j
CROSS JOIN
@min_datetime AS t
WHERE j.date > t.min_datetime;