汇总多批记录
Aggregate over batches of records
我想按 n 条记录的批次聚合流分析记录,每条记录都满足以下条件:
- 每条记录只能在一个批次中。
- 每批最多包含 n 条记录。
- 当批处理的第 n 条记录到达(或发生超时)时,window 关闭并评估查询。
我还没有找到使用 windowing 函数实现此目的的方法(因为它们是基于时间的,而不是基于计数的)。 CollectTOP
之类的函数也不起作用,因为它是针对每个记录而不是每个批次进行评估的。
如评论中所述,Azure 流分析没有事件计数的概念。
为了能够按特定大小的批次对事件进行重新分组,第一步是对它们进行排名(CollectTop). ASA, like any stream processor, will need a time window 定义该排名。
这不符合要求,在评论中也说明了。抱歉,ASA 似乎不是解决方案。
If you could tolerate a time window - 意思是容忍延迟(记录在window末尾输出)和容忍不完整的批次(大多数时间windows不会有 n 个事件的倍数) - 那么这可以通过多步查询部分实现。
输入 input file 和查询:
WITH Ranking AS (
SELECT
System.Timestamp() as ts,
COLLECTTOP(1000000) OVER (ORDER BY EntryTime, TollId) as c
FROM [entry] AS e TIMESTAMP BY EntryTime
GROUP BY TumblingWindow(minute,5)
),
Unfolding AS (
SELECT
System.Timestamp() AS windowEnd,
FLOOR((d.ArrayValue.rank-1) /3.0) as batch3,
d.ArrayValue.rank,
d.ArrayValue.value.*
FROM Ranking AS r
CROSS APPLY GetElements(r.c) AS d
),
Batching AS (
SELECT
System.Timestamp() as ts,
Collect() as batch
FROM Unfolding
GROUP BY System.Timestamp, batch3
)
SELECT * FROM Batching
我们将得到:
{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:01:00.0000000Z","TollId":1,"LicensePlate":"JNB 7001","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":7,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":1,"LicensePlate":"YXZ 1001","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":123456789},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":3,"LicensePlate":"ABC 1004","State":"CT","Make":"Ford","Model":"Taurus","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":456789123}]}
{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":1,"LicensePlate":"BNJ 1007","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":789123456},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":2,"LicensePlate":"XYZ 1003","State":"CT","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":6,"EntryTime":"2014-09-10T12:05:00.0000000Z","TollId":2,"LicensePlate":"CDE 1007","State":"NJ","Make":"Toyota","Model":"4x4","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":321987654}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:06:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":567891234},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":1,"LicensePlate":"ZYX 1002","State":"NY","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":234567891},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":2,"LicensePlate":"ZXY 1001","State":"PA","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":987654321}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:08:00.0000000Z","TollId":3,"LicensePlate":"CBA 1008","State":"PA","Make":"Ford","Model":"Mustang","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":891234567},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"CDB 1003","State":"PA","Make":"Volvo","Model":"C30","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":765432198},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"DCB 1004","State":"NY","Make":"Volvo","Model":"S80","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":654321987}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":7,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":3,"LicensePlate":"YZX 1009","State":"NY","Make":"Volvo","Model":"V70","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":912345678},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":8,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":1,"LicensePlate":"CBD 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":543219876},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":9,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":3,"LicensePlate":"BCD 1002","State":"NY","Make":"Toyota","Model":"Rav4","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":876543219}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:11:00.0000000Z","TollId":1,"LicensePlate":"NJB 1006","State":"CT","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":678912345},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:12:00.0000000Z","TollId":3,"LicensePlate":"PAC 1209","State":"NJ","Make":"Chevy","Model":"Malibu","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":219876543},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"PA","Make":"Peterbilt","Model":"389","VehicleType":2,"VehicleWeight":2.675,"Toll":15.5,"Tag":567891234}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":3,"LicensePlate":"EDC 3109","State":"NJ","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":198765432}]}
{"ts":"2014-09-10T12:20:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:18:00.0000000Z","TollId":2,"LicensePlate":"DEC 1008","State":"NY","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":1,"LicensePlate":"DBC 1006","State":"NY","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":432198765},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":2,"LicensePlate":"APC 2019","State":"NJ","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":345678912}]}
{"ts":"2014-09-10T12:25:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:25:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:22:00.0000000Z","TollId":1,"LicensePlate":"EDC 1019","State":"NJ","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null}]}
这里保证每个批次不会有超过N=3个事件。但是我们在 5 分钟内执行该保证 window。当一次window记录较少时,我们会得到少于N条记录的批次。
我想按 n 条记录的批次聚合流分析记录,每条记录都满足以下条件:
- 每条记录只能在一个批次中。
- 每批最多包含 n 条记录。
- 当批处理的第 n 条记录到达(或发生超时)时,window 关闭并评估查询。
我还没有找到使用 windowing 函数实现此目的的方法(因为它们是基于时间的,而不是基于计数的)。 CollectTOP
之类的函数也不起作用,因为它是针对每个记录而不是每个批次进行评估的。
如评论中所述,Azure 流分析没有事件计数的概念。
为了能够按特定大小的批次对事件进行重新分组,第一步是对它们进行排名(CollectTop). ASA, like any stream processor, will need a time window 定义该排名。
这不符合要求,在评论中也说明了。抱歉,ASA 似乎不是解决方案。
If you could tolerate a time window - 意思是容忍延迟(记录在window末尾输出)和容忍不完整的批次(大多数时间windows不会有 n 个事件的倍数) - 那么这可以通过多步查询部分实现。
输入 input file 和查询:
WITH Ranking AS (
SELECT
System.Timestamp() as ts,
COLLECTTOP(1000000) OVER (ORDER BY EntryTime, TollId) as c
FROM [entry] AS e TIMESTAMP BY EntryTime
GROUP BY TumblingWindow(minute,5)
),
Unfolding AS (
SELECT
System.Timestamp() AS windowEnd,
FLOOR((d.ArrayValue.rank-1) /3.0) as batch3,
d.ArrayValue.rank,
d.ArrayValue.value.*
FROM Ranking AS r
CROSS APPLY GetElements(r.c) AS d
),
Batching AS (
SELECT
System.Timestamp() as ts,
Collect() as batch
FROM Unfolding
GROUP BY System.Timestamp, batch3
)
SELECT * FROM Batching
我们将得到:
{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:01:00.0000000Z","TollId":1,"LicensePlate":"JNB 7001","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":7,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":1,"LicensePlate":"YXZ 1001","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":123456789},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":3,"LicensePlate":"ABC 1004","State":"CT","Make":"Ford","Model":"Taurus","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":456789123}]}
{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":1,"LicensePlate":"BNJ 1007","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":789123456},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":2,"LicensePlate":"XYZ 1003","State":"CT","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":6,"EntryTime":"2014-09-10T12:05:00.0000000Z","TollId":2,"LicensePlate":"CDE 1007","State":"NJ","Make":"Toyota","Model":"4x4","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":321987654}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:06:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":567891234},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":1,"LicensePlate":"ZYX 1002","State":"NY","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":234567891},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":2,"LicensePlate":"ZXY 1001","State":"PA","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":987654321}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:08:00.0000000Z","TollId":3,"LicensePlate":"CBA 1008","State":"PA","Make":"Ford","Model":"Mustang","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":891234567},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"CDB 1003","State":"PA","Make":"Volvo","Model":"C30","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":765432198},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"DCB 1004","State":"NY","Make":"Volvo","Model":"S80","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":654321987}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":7,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":3,"LicensePlate":"YZX 1009","State":"NY","Make":"Volvo","Model":"V70","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":912345678},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":8,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":1,"LicensePlate":"CBD 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":543219876},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":9,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":3,"LicensePlate":"BCD 1002","State":"NY","Make":"Toyota","Model":"Rav4","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":876543219}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:11:00.0000000Z","TollId":1,"LicensePlate":"NJB 1006","State":"CT","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":678912345},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:12:00.0000000Z","TollId":3,"LicensePlate":"PAC 1209","State":"NJ","Make":"Chevy","Model":"Malibu","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":219876543},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"PA","Make":"Peterbilt","Model":"389","VehicleType":2,"VehicleWeight":2.675,"Toll":15.5,"Tag":567891234}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":3,"LicensePlate":"EDC 3109","State":"NJ","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":198765432}]}
{"ts":"2014-09-10T12:20:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:18:00.0000000Z","TollId":2,"LicensePlate":"DEC 1008","State":"NY","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":1,"LicensePlate":"DBC 1006","State":"NY","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":432198765},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":2,"LicensePlate":"APC 2019","State":"NJ","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":345678912}]}
{"ts":"2014-09-10T12:25:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:25:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:22:00.0000000Z","TollId":1,"LicensePlate":"EDC 1019","State":"NJ","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null}]}
这里保证每个批次不会有超过N=3个事件。但是我们在 5 分钟内执行该保证 window。当一次window记录较少时,我们会得到少于N条记录的批次。