在 Flink Table API 中应用 tumble window 聚合时,kafka 主题接收器没有结果

No results in kafka topic sink when applying tumble window aggregation in Flink Table API

我正在使用 lyft flink operator

部署的 Flink 1.14

我正在尝试使用 Table API 进行 tumble window 聚合,从交易 table 源中读取,并将聚合结果放入 window进入一个新的kafka话题

我的来源是来自 debezium 的 kafka 主题

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

    //this is the source
    tEnv.executeSql("CREATE TABLE transactions (\n" +
            " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"+
            "  transaction_time AS TO_TIMESTAMP_LTZ(4001, 3),\n"+
            "  id INT PRIMARY KEY,\n" +
            "  transaction_status STRING,\n" +
            "  transaction_type STRING,\n" +
            "  merchant_id INT,\n" +
            "  WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
            ") WITH (\n" +
            " 'debezium-json.schema-include' = 'true' ,\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'dbserver1.inventory.transactions',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'scan.startup.mode' = 'earliest-offset',\n"+
            " 'format' = 'debezium-json'\n" +
            ")");

我翻滚 window 并通过以下方式计算相同 window 中的 ID:

public static Table report(Table transactions) {
    return transactions
            .window(Tumble.over(lit(2).minutes()).on($("transaction_time")).as("w"))
            .groupBy($("w"), $("transaction_status"))
            .select(
                    $("w").start().as("window_start"),
                    $("w").end().as("window_end"),
                    $("transaction_status"),
                    $("id").count().as("id_count"));
}

水槽是:

tEnv.executeSql("CREATE TABLE my_report (\n" +
            "window_start TIMESTAMP(3),\n"+
            "window_end TIMESTAMP(3)\n,"+
            "transaction_status STRING,\n" +
            " id_count BIGINT,\n" +
            " PRIMARY KEY (window_start) NOT ENFORCED\n"+
            ") WITH (\n" +
            " 'connector' = 'upsert-kafka',\n" +
            " 'topic' = 'dbserver1.inventory.my-window-sink',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'key.format' = 'json',\n"+
            " 'value.format' = 'json'\n"+
            ")");
    Table transactions = tEnv.from("transactions");
    Table merchants = tEnv.from("merchants");
    report(transactions).executeInsert("my_report");

问题是当我使用 dbserver1.inventory.my-window-sink kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning 我没有得到任何结果,我等了 2 分钟(window 大小) ,插入事务 table 然后再次等待 2 分钟并再次插入也没有结果。 不知道是不是我的水印有问题

我正在使用并行度:2

在 flink 仪表板上 UI 我可以看到在 GroupWindowAggregate 任务的详细信息中,当我插入 table 时接收到的记录增加了,但是我仍然看不到结果我消费话题了!

用这条线

transaction_time AS TO_TIMESTAMP_LTZ(4001, 3)

您为每个事件设置了相同的交易时间 (4001),并且

WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND

您已根据 transaction_time 安排水印。这样一来,时间就静止了,windows永远也无法关闭。

至于“我等了 2 分钟(window 大小)”,这不是事件时间处理的工作方式。假设时间戳和水印实际上在向前移动,无论处理 2 分钟的数据需要多长时间,您都需要等待。

除了 David 谢天谢地的回答之外,我还缺少 table.exec.source.idle-timeout 作为流媒体环境的配置,这是一个检查源是否空闲的变量。 该变量的默认值为 0,这意味着它不会检查源是否空闲。我将它设置为 1000 毫秒并修复它,因为它检查空闲源条件并以这种方式正确生成水印。 这可能不会影响具有一致消息摄取的常规流,但对我来说就是这种情况,因为我手动插入记录,因此流在很多时候都处于空闲状态