Flink - 加入相同的流以过滤一些事件

Flink - Join same stream in order to filter some events

我有一个看起来像这样的数据流:

impressionId | id | name | eventType | timestamp

我需要过滤(忽略)没有匹配 'impressionId' 类型 'impression' 的“点击”类型事件(因此基本上忽略没有印象的点击事件) 然后计算我在特定时间 window.

总共有多少印象以及我有多少点击(对于 id/name 对)

这就是我解决问题的方法:

[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
   

Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");

Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, id, name, eventType, minuteWindow")
      .select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");

Table filteredClickCount = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
 DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();

我想做的是简单地创建两个 tables,一个有点击次数,一个有展示次数,'inner' 将点击次数与展示次数相结合,而相结合的那个则意味着它们是具有匹配印象的点击次数。

现在这行不通了,我不知道为什么!?

最后一个关节 table 产生的计数不正确。它在第一分钟有效,但之后计数几乎减少了一倍。

然后我尝试像这样修改最后一个 table:

Table clickWithMatchingImpression2 = clicksTable
      .join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
      .groupBy("concatClickId, clickMinute")
      .select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");

DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
    result2.print();

而且....这行得通!?但是我不知道为什么,也不知道如何处理这个 DataStream> 格式...当 table 没有 [= 时,Flink 拒绝使用 toAppendStream 40=]。 我想要一个只有最后数字的简单结构。

1 ) 我的做法正确吗?是否有更简单的方法来过滤没有印象的点击?

2 ) 为什么我的解决方案中的计数不正确?

我不确定我是否正确理解了您的用例,一个包含一些数据点的示例肯定会有所帮助。

让我解释一下您的代码在做什么。首先这两个表计算了过去 24 小时内有多少 clicks/impressions。 对于输入

new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)

你会得到 windows (array, window_start, window_end, rowtime):

[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...

因此,当您对 id 和 name 进行分组时,您会得到类似这样的信息:

1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...

如果您在 24 小时后再次分组 windows 您将多次计算具有相同 ID 的每个事件。

如果我正确理解了您的用例,并且您正在寻找在点击发生前后的 1 分钟内发生了多少次展示,那么 interval join 可能就是您要寻找的。您可以使用以下查询来实现您的案例:

Table clicks = eventsTable
        .where($("eventType").isEqual("click"))
        .select(
                $("impressionId").as("clickImpressionId"),
                concat($("id"), "_", $("name")).as("concatClickId"),
                $("id").as("clickId"),
                $("name").as("clickName"),
                $("eventTime").as("clickEventTime")
        );

Table impressions = eventsTable
        .where($("eventType").isEqual("impression"))
        .select(
                $("impressionId").as("impressionImpressionId"),
                concat($("id"), "_", $("name")).as("concatImpressionId"),
                $("id").as("impressionId"),
                $("name").as("impressionName"),
                $("eventTime").as("impressionEventTime")
        );

Table table = impressions.join(
        clicks,
        $("clickImpressionId").isEqual($("impressionImpressionId"))
                .and(
                        $("clickEventTime").between(
                                $("impressionEventTime").minus(lit(1).minutes()),
                                $("impressionEventTime"))
                ))
        .select($("concatClickId"), $("impressionEventTime"));

table
        .window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
        .groupBy($("concatClickId"), $("minuteWindow"))
        .select($("concatClickId"), $("concatClickId").count())
        .execute()
        .print();

至于为什么Flink有时候不能产生append stream,只能retract streamsee。简而言之,如果基于时间属性的操作不起作用,则结果为“有效”时不存在单个时间点。因此它必须发出变化流而不是单个附加值。元组中的第一个字段告诉您记录是插入(真)还是 retraction/deletion(假)。