Flink SQL,如何通过eventtime获取kafka数据流中的第一条记录和最后一条记录存储到DB中(如GP,MySQL)?

Flink SQL, how to get the first record and the last record by eventtime in kafka data flow and store it to a DB(such as GP, MySQL)?

Flink SQL,如何通过eventtime获取Kafka数据流中的第一条记录和最后一条记录并存储到DB中(如MySQL)?

另外,如果Kafka数据流中有新记录,我们应该更新MySQL中的记录。

  1. 假设,Kafka中的记录如下:
    {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
    {'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
    {'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}      
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}

  1. 通过Flink SQL,我预期的结果如下:
    {'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
  1. 我们将这些记录存入MySQL,假设结果如下,
    |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 16:10:00     |             18             |
  1. 如果 Kafka 中的新记录即将到来,
    {'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
  1. 希望能更新MySQLare的记录,更新结果如下:
    |    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 17:18:00     |             30             |

我在第2步和第5步遇到了一些麻烦,谁能给点意见?

按行时间排序的重复数据删除是最简单的方法,但这在 1.12 中受支持。 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication

CREATE TABLE src (
  word STRING,
  eventtime TIMESTAMP(3),
  appear_page INT,
  WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka',
  ...
);

-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
  FROM src
) WHERE rownum = 1;

这个查询在1.11应该也能用,但没有优化成去重,而是TopN算子,效率较低。

当我在 Flink SQL 中 运行 时,我得到这个 Table 异常 -

SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum

[错误] 无法执行 SQL 语句。原因: org.apache.flink.table.api.TableException: window只能在升序模式下排序。

Flink 1.11.3 的 OVER() 可以不使用 DESC 吗?

谢谢,