用于合并两个事件的 Azure 流分析查询

Azure Stream Analytics Query to consolidate two events

我尝试合并来自同一 EventHub 输入的两个不同 事件(EventB 和 EventC)。我想要实现的是输出(Azure Function)一个合并事件(EventB + EventC)只要收到一个EventC。

事件是这样的:

{
    "EventB": {
        "Claim": {
            "EventAUri": "A/123",
            "Uri": "B/456"
        },
        "Metainfo": {
            "Color": "Green"
        }
    }   
}

{
    "EventC" : {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        }
    }
}

EventB 只会发送一次,而 EventC 每分钟会发送几次。上述示例的所需输出为:

    {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        },
        "BMetainfo": {
            "Color": "Green"
        }
    }

这是我目前尝试过的方法:

WITH AllEvents AS (
    SELECT 
        *
    FROM
        ehubinput
),
EventB AS (
select
    EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
    select EventC
from AllEvents
Where EventC Is Not NUll
)

Select * From EventB 
 Inner Join EventC 
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri

不幸的是,我的代码将输出 EventB + x(for each EventC) * EventC 而不是 EventB + Last EventC....

谁能帮我解决这个问题?

更新:

This is my input.

This is my current output。 (我只想要最新的 EventC 与 EventB 结合在一起,而不是像我现在这样做的流中的每个事件)

我重现了你的案例,并提出了以下查询:

WITH AllEvents AS (
  SELECT 
    *
  FROM
  Input
),
EventB AS (
 select
 EventB
 From AllEvents
 Where EventB Is Not NUll
),
EventC AS (
  select EventC, EventC.Time
  from AllEvents
  Where EventC Is Not NUll
),
test as (
  Select *, EventC.* From EventB 
  Inner Join EventC 
  On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
 AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)


select topone() over (order by Time) from test  GROUP BY TumblingWindow(second, 10)   

对于事件系列,它总是 return 最后匹配的 (EventC, EventB) 对组合。如果这不是您的预期输出,请您为上面指定的输入写下预期输出吗?

我使用了 VS2019 和流分析扩展。我根据您上面的描述指定了本地输入。

更新

查询已更新。我注意到样本负载中只有最后一个 EventC 包含 属性 'Time'。通过为每个事件 C 设置 属性,并使用上面的查询,您将得到 'Wasserburg' 作为结果。

当然,输出必须格式化,但在这种情况下结果是正确的。

进一步更新 我玩了更多,因为我发现它真的很有趣,并提出了以下在概念上与前一个不同的查询,我会说更精确:

-- timestamp by how events are enqueued
WITH AllEvents AS (
    SELECT 
       Input
     FROM 
     Input timestamp by input.EventEnqueuedUtcTime  
    ),

-- get the last eventB, because only last eventB is relevant
EventB AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventB Is Not NUll)  as EventB 
    From AllEvents 
 ),
 LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY  slidingwindow(second, 60)),

 -- get the last eventC
 EventC AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventC Is Not NUll)  as EventC 
    From AllEvents 
 ),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY  slidingwindow(second, 60)),

-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
   Select LastB.topone.*, LastC.topone.* From LastB 
   Inner Join LastC 
   On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60 
   AND LastB.topone.EventB.EventB.Claim.Uri  = LastC.topone.EventC.EventC.Claim.EventBUri)

-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin  GROUP BY  slidingwindow(second, 60)

-- Just a cross-check what is the last event B
select * into Output1 from LastB

我使用了时间 window 函数,因为您提到事件会在一分钟内到达。所以本质上,这个想法是提取最后一个 B 事件和最后一个 C 事件,然后将匹配的事件传播到输出。

我在带有事件中心消息发布者的真实事件中心上对其进行了测试,以便我可以模拟事件流,类似于您的示例:

然后我在本地观察输出,看看在最后一个事件后我是否会得到正确的结果:

此外,我将时间 属性 添加到每个事件(B 和 C),正如您可以从消息模拟器中看到的那样,因为 属性 用于对查询中的事件进行排序.当然,您可以将其替换为其他 属性,例如 EventEnqueuedUtcTime 或类似的东西。

我希望您会发现这两种不同方法中的一种对您的最终解决方案有用。