Flink:左加入一个带有静态列表的流
Flink: Left joining a stream with a static list
我想将尝试流加入到被阻止电子邮件的静态列表中,并按 IP 对结果进行分组,这样我以后可以计算一组相关统计数据。结果应以每 10 秒后 30 分钟的滑动 window 形式传递。以下是我尝试实现此目的的几种方法之一:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
这使用下面的用户定义的 Table 函数,它已经在我的 tableEnv
中注册为 blockedEmailsList
:
public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;
public BlockedEmailsList(Collection<String> emails) {
this.emails = emails;
}
public Row read(String email) {
return Row.of(email);
}
public void eval() {
this.emails.forEach(email -> collect(read(email)));
}
}
但是,returns 出现以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
如果我按照它的建议将 created_at
转换为 TIMESTAMP
,我会得到这个:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
我在 Stack Overflow 上发现了与这些异常相关的其他问题,但它们涉及流和临时表,其中 none 解决了将流加入静态列表的情况。
有什么想法吗?
编辑: 看起来我的用例在 Flink 项目中有一个未解决的问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
因此,我也接受解决方法建议。
我设法实施了一个解决方法,解决了我的问题!
我没有将流式尝试与静态电子邮件列表结合起来,而是预先将每个尝试映射到一个添加了 blockedEmail
属性的新尝试。如果静态列表 blockedEmails
包含当前的 Attempt 电子邮件,我将其 blockedEmail
属性设置为 true
.
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
@Override
public Attempt map(Attempt attempt) throws Exception {
if (blockedEmails.contains(attempt.getEmail())) {
attempt.setBlockedEmail(true);
}
return attempt;
}
});
静态列表 blockedEmails
的类型为 HashSet
,因此查找的时间复杂度为 O(1)。
最终分组查询调整为:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"WHERE Attempts.email <> '' " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
到目前为止,流和静态列表之间的连接问题似乎尚未解决,但在我的情况下,上述变通解决方案很好地解决了它。
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
原因是横向table函数是Flink正则join,正则join会发送null值,例如
left:(K0, A), right(K1, T1) => send (K0, A, NULL, NULL)
left: , right(K0, T2) => retract (K0, A, NULL, NULL )
send (K0, A, K0, T2)
因此输入流的时间属性在加入后将丢失。
在你的情况下,你不需要 TableFunction,你可以使用 Scalar Function
喜欢:
public static class BlockedEmailFunction extends ScalarFunction {
private static List<String> blockedEmails = ...;
public Boolean eval(String email) {
return blockedEmails.contains(attempt.getEmail());
}
}
// register function
env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);
// call registered function in SQL and do window operation as your expected
env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");
我想将尝试流加入到被阻止电子邮件的静态列表中,并按 IP 对结果进行分组,这样我以后可以计算一组相关统计数据。结果应以每 10 秒后 30 分钟的滑动 window 形式传递。以下是我尝试实现此目的的几种方法之一:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
这使用下面的用户定义的 Table 函数,它已经在我的 tableEnv
中注册为 blockedEmailsList
:
public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;
public BlockedEmailsList(Collection<String> emails) {
this.emails = emails;
}
public Row read(String email) {
return Row.of(email);
}
public void eval() {
this.emails.forEach(email -> collect(read(email)));
}
}
但是,returns 出现以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
如果我按照它的建议将 created_at
转换为 TIMESTAMP
,我会得到这个:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
我在 Stack Overflow 上发现了与这些异常相关的其他问题,但它们涉及流和临时表,其中 none 解决了将流加入静态列表的情况。
有什么想法吗?
编辑: 看起来我的用例在 Flink 项目中有一个未解决的问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
因此,我也接受解决方法建议。
我设法实施了一个解决方法,解决了我的问题!
我没有将流式尝试与静态电子邮件列表结合起来,而是预先将每个尝试映射到一个添加了 blockedEmail
属性的新尝试。如果静态列表 blockedEmails
包含当前的 Attempt 电子邮件,我将其 blockedEmail
属性设置为 true
.
DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
@Override
public Attempt map(Attempt attempt) throws Exception {
if (blockedEmails.contains(attempt.getEmail())) {
attempt.setBlockedEmail(true);
}
return attempt;
}
});
静态列表 blockedEmails
的类型为 HashSet
,因此查找的时间复杂度为 O(1)。
最终分组查询调整为:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"WHERE Attempts.email <> '' " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
到目前为止,流和静态列表之间的连接问题似乎尚未解决,但在我的情况下,上述变通解决方案很好地解决了它。
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
原因是横向table函数是Flink正则join,正则join会发送null值,例如
left:(K0, A), right(K1, T1) => send (K0, A, NULL, NULL)
left: , right(K0, T2) => retract (K0, A, NULL, NULL )
send (K0, A, K0, T2)
因此输入流的时间属性在加入后将丢失。
在你的情况下,你不需要 TableFunction,你可以使用 Scalar Function 喜欢:
public static class BlockedEmailFunction extends ScalarFunction {
private static List<String> blockedEmails = ...;
public Boolean eval(String email) {
return blockedEmails.contains(attempt.getEmail());
}
}
// register function
env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);
// call registered function in SQL and do window operation as your expected
env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");