Flink - 加入相同的流以过滤一些事件
Flink - Join same stream in order to filter some events
我有一个看起来像这样的数据流:
impressionId | id | name | eventType | timestamp
我需要过滤(忽略)没有匹配 'impressionId' 类型 'impression' 的“点击”类型事件(因此基本上忽略没有印象的点击事件)
然后计算我在特定时间 window.
总共有多少印象以及我有多少点击(对于 id/name 对)
这就是我解决问题的方法:
[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
Table clicksTable = eventsTable
.where("eventType = 'click'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");
Table impressionsTable = eventsTable
.where("eventType = 'impression'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");
Table filteredClickCount = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();
我想做的是简单地创建两个 tables,一个有点击次数,一个有展示次数,'inner' 将点击次数与展示次数相结合,而相结合的那个则意味着它们是具有匹配印象的点击次数。
现在这行不通了,我不知道为什么!?
最后一个关节 table 产生的计数不正确。它在第一分钟有效,但之后计数几乎减少了一倍。
然后我尝试像这样修改最后一个 table:
Table clickWithMatchingImpression2 = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
result2.print();
而且....这行得通!?但是我不知道为什么,也不知道如何处理这个 DataStream> 格式...当 table 没有 [= 时,Flink 拒绝使用 toAppendStream 40=]。
我想要一个只有最后数字的简单结构。
1 ) 我的做法正确吗?是否有更简单的方法来过滤没有印象的点击?
2 ) 为什么我的解决方案中的计数不正确?
我不确定我是否正确理解了您的用例,一个包含一些数据点的示例肯定会有所帮助。
让我解释一下您的代码在做什么。首先这两个表计算了过去 24 小时内有多少 clicks/impressions。
对于输入
new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)
你会得到 windows (array, window_start, window_end, rowtime):
[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...
因此,当您对 id 和 name 进行分组时,您会得到类似这样的信息:
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...
如果您在 24 小时后再次分组 windows 您将多次计算具有相同 ID 的每个事件。
如果我正确理解了您的用例,并且您正在寻找在点击发生前后的 1 分钟内发生了多少次展示,那么 interval join 可能就是您要寻找的。您可以使用以下查询来实现您的案例:
Table clicks = eventsTable
.where($("eventType").isEqual("click"))
.select(
$("impressionId").as("clickImpressionId"),
concat($("id"), "_", $("name")).as("concatClickId"),
$("id").as("clickId"),
$("name").as("clickName"),
$("eventTime").as("clickEventTime")
);
Table impressions = eventsTable
.where($("eventType").isEqual("impression"))
.select(
$("impressionId").as("impressionImpressionId"),
concat($("id"), "_", $("name")).as("concatImpressionId"),
$("id").as("impressionId"),
$("name").as("impressionName"),
$("eventTime").as("impressionEventTime")
);
Table table = impressions.join(
clicks,
$("clickImpressionId").isEqual($("impressionImpressionId"))
.and(
$("clickEventTime").between(
$("impressionEventTime").minus(lit(1).minutes()),
$("impressionEventTime"))
))
.select($("concatClickId"), $("impressionEventTime"));
table
.window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
.groupBy($("concatClickId"), $("minuteWindow"))
.select($("concatClickId"), $("concatClickId").count())
.execute()
.print();
至于为什么Flink有时候不能产生append stream,只能retract streamsee。简而言之,如果基于时间属性的操作不起作用,则结果为“有效”时不存在单个时间点。因此它必须发出变化流而不是单个附加值。元组中的第一个字段告诉您记录是插入(真)还是 retraction/deletion(假)。
我有一个看起来像这样的数据流:
impressionId | id | name | eventType | timestamp
我需要过滤(忽略)没有匹配 'impressionId' 类型 'impression' 的“点击”类型事件(因此基本上忽略没有印象的点击事件) 然后计算我在特定时间 window.
总共有多少印象以及我有多少点击(对于 id/name 对)这就是我解决问题的方法:
[...]
Table eventsTable = tEnv.fromDataStream(eventStreamWithTimeStamp, "impressionId, id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
Table clicksTable = eventsTable
.where("eventType = 'click'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as clickImpressionId, eventType as clickEventType, concat(concat(id,'_'), name) as concatClickId, id as clickId, name as clickName, minuteWindow.rowtime as clickMinute");
Table impressionsTable = eventsTable
.where("eventType = 'impression'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, id, name, eventType, minuteWindow")
.select("impressionId as impressionImpressionId, eventType as impressionEventType, concat(concat(id,'_'), name) as concatImpId, id as impId, name as impName, minuteWindow.rowtime as impMinute");
Table filteredClickCount = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.window(Slide.over("24.hour").every("1.minute").on("clickMinute").as("minuteWindow"))
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Test3> result = tEnv.toAppendStream(filteredClickCount, Test3.class);
result.print();
我想做的是简单地创建两个 tables,一个有点击次数,一个有展示次数,'inner' 将点击次数与展示次数相结合,而相结合的那个则意味着它们是具有匹配印象的点击次数。
现在这行不通了,我不知道为什么!?
最后一个关节 table 产生的计数不正确。它在第一分钟有效,但之后计数几乎减少了一倍。
然后我尝试像这样修改最后一个 table:
Table clickWithMatchingImpression2 = clicksTable
.join(impressionsTable, "clickImpressionId = impressionImpressionId && concatClickId = concatImpId && clickMinute = impMinute")
.groupBy("concatClickId, clickMinute")
.select("concatClickId, concatClickId.count as clickCount, clickMinute as eventTime");
DataStream<Tuple3<Boolean, Tuple3>> result2 = tEnv.toRetractStream(clickWithMatchingImpression2, Test3.class);
result2.print();
而且....这行得通!?但是我不知道为什么,也不知道如何处理这个 DataStream
1 ) 我的做法正确吗?是否有更简单的方法来过滤没有印象的点击?
2 ) 为什么我的解决方案中的计数不正确?
我不确定我是否正确理解了您的用例,一个包含一些数据点的示例肯定会有所帮助。
让我解释一下您的代码在做什么。首先这两个表计算了过去 24 小时内有多少 clicks/impressions。 对于输入
new Event("1", "1", "ABC", "...", 1),
new Event("1", "2", "ABC", "...", 2),
new Event("1", "3", "ABC", "...", 3),
new Event("1", "4", "ABC", "...", 4)
你会得到 windows (array
[1], 1969-12-31-01T00:01:00.000, 1970-01-01T00:01:00.000, 1970-01-01T00:00:59.999
[1, 2], 1969-12-31-01T00:02:00.000, 1970-01-01T00:02:00.000, 1970-01-01T00:01:59.999
[1, 2, 3], 1969-12-31-01T00:03:00.000, 1970-01-01T00:03:00.000, 1970-01-01T00:02:59.999
...
因此,当您对 id 和 name 进行分组时,您会得到类似这样的信息:
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:00:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:01:59.999
1, '...', '1_ABC', 1, 'ABC', 1970-01-01T00:02:59.999
...
如果您在 24 小时后再次分组 windows 您将多次计算具有相同 ID 的每个事件。
如果我正确理解了您的用例,并且您正在寻找在点击发生前后的 1 分钟内发生了多少次展示,那么 interval join 可能就是您要寻找的。您可以使用以下查询来实现您的案例:
Table clicks = eventsTable
.where($("eventType").isEqual("click"))
.select(
$("impressionId").as("clickImpressionId"),
concat($("id"), "_", $("name")).as("concatClickId"),
$("id").as("clickId"),
$("name").as("clickName"),
$("eventTime").as("clickEventTime")
);
Table impressions = eventsTable
.where($("eventType").isEqual("impression"))
.select(
$("impressionId").as("impressionImpressionId"),
concat($("id"), "_", $("name")).as("concatImpressionId"),
$("id").as("impressionId"),
$("name").as("impressionName"),
$("eventTime").as("impressionEventTime")
);
Table table = impressions.join(
clicks,
$("clickImpressionId").isEqual($("impressionImpressionId"))
.and(
$("clickEventTime").between(
$("impressionEventTime").minus(lit(1).minutes()),
$("impressionEventTime"))
))
.select($("concatClickId"), $("impressionEventTime"));
table
.window(Slide.over("24.hour").every("1.minute").on("impressionEventTime").as("minuteWindow"))
.groupBy($("concatClickId"), $("minuteWindow"))
.select($("concatClickId"), $("concatClickId").count())
.execute()
.print();
至于为什么Flink有时候不能产生append stream,只能retract streamsee。简而言之,如果基于时间属性的操作不起作用,则结果为“有效”时不存在单个时间点。因此它必须发出变化流而不是单个附加值。元组中的第一个字段告诉您记录是插入(真)还是 retraction/deletion(假)。