Flink SQL,如何通过eventtime获取kafka数据流中的第一条记录和最后一条记录存储到DB中(如GP,MySQL)?
Flink SQL, how to get the first record and the last record by eventtime in kafka data flow and store it to a DB(such as GP, MySQL)?
Flink SQL
,如何通过eventtime获取Kafka
数据流中的第一条记录和最后一条记录并存储到DB中(如MySQL
)?
另外,如果Kafka
数据流中有新记录,我们应该更新MySQL
中的记录。
- 假设,
Kafka
中的记录如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
{'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
{'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}
- 通过
Flink SQL
,我预期的结果如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
- 我们将这些记录存入
MySQL
,假设结果如下,
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 16:10:00 | 18 |
- 如果
Kafka
中的新记录即将到来,
{'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
- 希望能更新
MySQL
中are
的记录,更新结果如下:
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 17:18:00 | 30 |
我在第2步和第5步遇到了一些麻烦,谁能给点意见?
按行时间排序的重复数据删除是最简单的方法,但这在 1.12 中受支持。 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
CREATE TABLE src (
word STRING,
eventtime TIMESTAMP(3),
appear_page INT,
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
...
);
-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
FROM src
) WHERE rownum = 1;
这个查询在1.11应该也能用,但没有优化成去重,而是TopN算子,效率较低。
当我在 Flink SQL 中 运行 时,我得到这个 Table 异常 -
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
[错误] 无法执行 SQL 语句。原因:
org.apache.flink.table.api.TableException: window只能在升序模式下排序。
Flink 1.11.3 的 OVER() 可以不使用 DESC 吗?
谢谢,
Flink SQL
,如何通过eventtime获取Kafka
数据流中的第一条记录和最后一条记录并存储到DB中(如MySQL
)?
另外,如果Kafka
数据流中有新记录,我们应该更新MySQL
中的记录。
- 假设,
Kafka
中的记录如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
{'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
{'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}
- 通过
Flink SQL
,我预期的结果如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
- 我们将这些记录存入
MySQL
,假设结果如下,
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 16:10:00 | 18 |
- 如果
Kafka
中的新记录即将到来,
{'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
- 希望能更新
MySQL
中are
的记录,更新结果如下:
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 17:18:00 | 30 |
我在第2步和第5步遇到了一些麻烦,谁能给点意见?
按行时间排序的重复数据删除是最简单的方法,但这在 1.12 中受支持。 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
CREATE TABLE src (
word STRING,
eventtime TIMESTAMP(3),
appear_page INT,
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
...
);
-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
FROM src
) WHERE rownum = 1;
这个查询在1.11应该也能用,但没有优化成去重,而是TopN算子,效率较低。
当我在 Flink SQL 中 运行 时,我得到这个 Table 异常 -
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
[错误] 无法执行 SQL 语句。原因: org.apache.flink.table.api.TableException: window只能在升序模式下排序。
Flink 1.11.3 的 OVER() 可以不使用 DESC 吗?
谢谢,