为什么 Flink Table SQL API upsert-kafka sink 连接器不创建日志压缩主题?

Why Flink Table SQL API upsert-kafka sink connector doesn't create a log compacted topic?

我正在尝试复制 Flink 的 upsert-kafka connector example

使用以下输入:

event_id,user_id,page_id,user_region,viewtime
e0,1,11,TR,2022-01-01T13:26:41.298Z
e1,1,22,TR,2022-01-02T13:26:41.298Z
e2,2,11,AU,2022-02-01T13:26:41.298Z

并创建了一个主题,其事件结构如下所示:

key: {"event_id":"e2"}, 
value: {"event_id": "e2", "user_id": 2, "page_id": 11, "user_region": "AU", "viewtime": "2022-02-01T13:26:41.298Z"}

使用以下 kafka upstream,kafka-upsert sink 逻辑:

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'json',
  'value.format' = 'json'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

我希望只为 {"user_region":"TR"} 获得一个密钥,更新后的 pv: 2,但是创建的主题似乎没有被日志压缩,因此观察到两个相同 user_region:

的事件
k: {"user_region":"AU"}, v: {"user_region":"AU","pv":1,"uv":1}
k: {"user_region":"TR"}, v: {"user_region":"TR","pv":2,"uv":1}
k: {"user_region":"TR"}, v: {"user_region":"TR","pv":1,"uv":1}

upsert-kafka 连接器不是应该为此用例创建一个日志压缩主题,还是开发人员有责任更新主题配置?

另一种可能是我误解了某些事情或做错了。 期待听到您的想法。谢谢。

当您使用 CREATE TABLE 创建用于 Flink SQL 的 table 时,您是在描述如何将现有数据存储解释为 table。换句话说,您正在 Flink 目录中创建元数据。首次访问时创建主题的是 Kafka,开发人员有责任调整日志配置以使用压缩策略。