AppendStreamTableSink 不支持使用节点 Join(joinType=[InnerJoin] 产生的更新更改
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin]
当我使用Flink SQL执行如下语句时,报错如下:
请求
将user_behavior_kafka_table中的数据按照user_id
字段分组,然后在每组中取出ts
字段值最大的那条数据
执行sql
SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a
WHERE ts = (select max(b.ts)
FROM user_behavior_kafka_table AS b
WHERE a.user_id = b.user_id );
Flink 版本
1.11.2
错误信息
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR[=11=]))], select=[user_id, item_id, ts, user_id0, EXPR[=11=]], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
作业部署
在纱线上
Table留言
- user_behavior_kafka_table 来自消费者kafka主题的数据
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目","ts":100 }
{"user_id":"ccc","item_id":"11-222-334","comment":"ccc 访问项在","ts":200 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项在","ts":300 }
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项位于","ts":200 }
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目","ts":200 }
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项位于","ts":400 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":400 }
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目","ts":200 }
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项位于","ts":300 }
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目","ts":300 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":100 }
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项位于","ts":100 }
- user_behavior_hive_table 预期结果
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项位于","ts":400 }
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项位于","ts":300 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":400 }
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目","ts":200 }
要从该查询中获得您期望的结果,需要以批处理模式执行。作为流式查询,Flink SQL 规划器无法处理它,如果可以,它会产生一个结果流,其中每个 user_id
的最后一个结果将与预期结果相匹配,但会有额外的中间结果。
例如,对于用户 aaa,将显示这些结果:
aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400
但是 ts=300 的行将被跳过,因为它从来不是 ts 最大值的行。
如果你想让它在流模式下工作,请尝试将其重新格式化为 top-n query:
SELECT user_id, item_id, ts FROM
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
FROM user_behavior_kafka_table)
WHERE row_num = 1;
我相信这应该可行,但我无法轻松测试它。
当我使用Flink SQL执行如下语句时,报错如下:
请求
将user_behavior_kafka_table中的数据按照user_id
字段分组,然后在每组中取出ts
字段值最大的那条数据
执行sql
SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a
WHERE ts = (select max(b.ts)
FROM user_behavior_kafka_table AS b
WHERE a.user_id = b.user_id );
Flink 版本
1.11.2
错误信息
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR[=11=]))], select=[user_id, item_id, ts, user_id0, EXPR[=11=]], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
作业部署
在纱线上
Table留言
- user_behavior_kafka_table 来自消费者kafka主题的数据
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目","ts":100 }
{"user_id":"ccc","item_id":"11-222-334","comment":"ccc 访问项在","ts":200 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项在","ts":300 }
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项位于","ts":200 }
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目","ts":200 }
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项位于","ts":400 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":400 }
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目","ts":200 }
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项位于","ts":300 }
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目","ts":300 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":100 }
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项位于","ts":100 }
- user_behavior_hive_table 预期结果
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项位于","ts":400 }
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项位于","ts":300 }
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目","ts":400 }
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目","ts":200 }
要从该查询中获得您期望的结果,需要以批处理模式执行。作为流式查询,Flink SQL 规划器无法处理它,如果可以,它会产生一个结果流,其中每个 user_id
的最后一个结果将与预期结果相匹配,但会有额外的中间结果。
例如,对于用户 aaa,将显示这些结果:
aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400
但是 ts=300 的行将被跳过,因为它从来不是 ts 最大值的行。
如果你想让它在流模式下工作,请尝试将其重新格式化为 top-n query:
SELECT user_id, item_id, ts FROM
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
FROM user_behavior_kafka_table)
WHERE row_num = 1;
我相信这应该可行,但我无法轻松测试它。