用于合并两个事件的 Azure 流分析查询
Azure Stream Analytics Query to consolidate two events
我尝试合并来自同一 EventHub 输入的两个不同 事件(EventB 和 EventC)。我想要实现的是输出(Azure Function)一个合并事件(EventB + EventC)只要收到一个EventC。
事件是这样的:
{
"EventB": {
"Claim": {
"EventAUri": "A/123",
"Uri": "B/456"
},
"Metainfo": {
"Color": "Green"
}
}
}
和
{
"EventC" : {
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
}
}
}
EventB 只会发送一次,而 EventC 每分钟会发送几次。上述示例的所需输出为:
{
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
},
"BMetainfo": {
"Color": "Green"
}
}
这是我目前尝试过的方法:
WITH AllEvents AS (
SELECT
*
FROM
ehubinput
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC
from AllEvents
Where EventC Is Not NUll
)
Select * From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri
不幸的是,我的代码将输出 EventB + x(for each EventC) * EventC 而不是 EventB + Last EventC....
谁能帮我解决这个问题?
更新:
This is my current output。 (我只想要最新的 EventC 与 EventB 结合在一起,而不是像我现在这样做的流中的每个事件)
我重现了你的案例,并提出了以下查询:
WITH AllEvents AS (
SELECT
*
FROM
Input
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC, EventC.Time
from AllEvents
Where EventC Is Not NUll
),
test as (
Select *, EventC.* From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)
select topone() over (order by Time) from test GROUP BY TumblingWindow(second, 10)
对于事件系列,它总是 return 最后匹配的 (EventC, EventB) 对组合。如果这不是您的预期输出,请您为上面指定的输入写下预期输出吗?
我使用了 VS2019 和流分析扩展。我根据您上面的描述指定了本地输入。
更新
查询已更新。我注意到样本负载中只有最后一个 EventC 包含 属性 'Time'。通过为每个事件 C 设置 属性,并使用上面的查询,您将得到 'Wasserburg' 作为结果。
当然,输出必须格式化,但在这种情况下结果是正确的。
进一步更新
我玩了更多,因为我发现它真的很有趣,并提出了以下在概念上与前一个不同的查询,我会说更精确:
-- timestamp by how events are enqueued
WITH AllEvents AS (
SELECT
Input
FROM
Input timestamp by input.EventEnqueuedUtcTime
),
-- get the last eventB, because only last eventB is relevant
EventB AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventB Is Not NUll) as EventB
From AllEvents
),
LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY slidingwindow(second, 60)),
-- get the last eventC
EventC AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventC Is Not NUll) as EventC
From AllEvents
),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY slidingwindow(second, 60)),
-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
Select LastB.topone.*, LastC.topone.* From LastB
Inner Join LastC
On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60
AND LastB.topone.EventB.EventB.Claim.Uri = LastC.topone.EventC.EventC.Claim.EventBUri)
-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin GROUP BY slidingwindow(second, 60)
-- Just a cross-check what is the last event B
select * into Output1 from LastB
我使用了时间 window 函数,因为您提到事件会在一分钟内到达。所以本质上,这个想法是提取最后一个 B 事件和最后一个 C 事件,然后将匹配的事件传播到输出。
我在带有事件中心消息发布者的真实事件中心上对其进行了测试,以便我可以模拟事件流,类似于您的示例:
然后我在本地观察输出,看看在最后一个事件后我是否会得到正确的结果:
此外,我将时间 属性 添加到每个事件(B 和 C),正如您可以从消息模拟器中看到的那样,因为 属性 用于对查询中的事件进行排序.当然,您可以将其替换为其他 属性,例如 EventEnqueuedUtcTime 或类似的东西。
我希望您会发现这两种不同方法中的一种对您的最终解决方案有用。
我尝试合并来自同一 EventHub 输入的两个不同 事件(EventB 和 EventC)。我想要实现的是输出(Azure Function)一个合并事件(EventB + EventC)只要收到一个EventC。
事件是这样的:
{
"EventB": {
"Claim": {
"EventAUri": "A/123",
"Uri": "B/456"
},
"Metainfo": {
"Color": "Green"
}
}
}
和
{
"EventC" : {
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
}
}
}
EventB 只会发送一次,而 EventC 每分钟会发送几次。上述示例的所需输出为:
{
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
},
"BMetainfo": {
"Color": "Green"
}
}
这是我目前尝试过的方法:
WITH AllEvents AS (
SELECT
*
FROM
ehubinput
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC
from AllEvents
Where EventC Is Not NUll
)
Select * From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri
不幸的是,我的代码将输出 EventB + x(for each EventC) * EventC 而不是 EventB + Last EventC....
谁能帮我解决这个问题?
更新:
This is my current output。 (我只想要最新的 EventC 与 EventB 结合在一起,而不是像我现在这样做的流中的每个事件)
我重现了你的案例,并提出了以下查询:
WITH AllEvents AS (
SELECT
*
FROM
Input
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC, EventC.Time
from AllEvents
Where EventC Is Not NUll
),
test as (
Select *, EventC.* From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)
select topone() over (order by Time) from test GROUP BY TumblingWindow(second, 10)
对于事件系列,它总是 return 最后匹配的 (EventC, EventB) 对组合。如果这不是您的预期输出,请您为上面指定的输入写下预期输出吗?
我使用了 VS2019 和流分析扩展。我根据您上面的描述指定了本地输入。
更新
查询已更新。我注意到样本负载中只有最后一个 EventC 包含 属性 'Time'。通过为每个事件 C 设置 属性,并使用上面的查询,您将得到 'Wasserburg' 作为结果。
当然,输出必须格式化,但在这种情况下结果是正确的。
进一步更新 我玩了更多,因为我发现它真的很有趣,并提出了以下在概念上与前一个不同的查询,我会说更精确:
-- timestamp by how events are enqueued
WITH AllEvents AS (
SELECT
Input
FROM
Input timestamp by input.EventEnqueuedUtcTime
),
-- get the last eventB, because only last eventB is relevant
EventB AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventB Is Not NUll) as EventB
From AllEvents
),
LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY slidingwindow(second, 60)),
-- get the last eventC
EventC AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventC Is Not NUll) as EventC
From AllEvents
),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY slidingwindow(second, 60)),
-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
Select LastB.topone.*, LastC.topone.* From LastB
Inner Join LastC
On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60
AND LastB.topone.EventB.EventB.Claim.Uri = LastC.topone.EventC.EventC.Claim.EventBUri)
-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin GROUP BY slidingwindow(second, 60)
-- Just a cross-check what is the last event B
select * into Output1 from LastB
我使用了时间 window 函数,因为您提到事件会在一分钟内到达。所以本质上,这个想法是提取最后一个 B 事件和最后一个 C 事件,然后将匹配的事件传播到输出。
我在带有事件中心消息发布者的真实事件中心上对其进行了测试,以便我可以模拟事件流,类似于您的示例:
然后我在本地观察输出,看看在最后一个事件后我是否会得到正确的结果:
此外,我将时间 属性 添加到每个事件(B 和 C),正如您可以从消息模拟器中看到的那样,因为 属性 用于对查询中的事件进行排序.当然,您可以将其替换为其他 属性,例如 EventEnqueuedUtcTime 或类似的东西。
我希望您会发现这两种不同方法中的一种对您的最终解决方案有用。