Flink - SQL Tumble End on event time 不返回任何结果

Flink - SQL Tumble End on event time not returning any result

我有一个 Flink 作业,它使用 kafka 主题并尝试根据 eventId 和 eventName 等少数列创建 windows。 Kafka 主题有 eventTimestamp 作为时间戳字段,时间戳以毫秒为单位

DataStreamSource kafkaStream = env.fromSource(
    kafkaSource, //kafkaSource is the KafkaSource builder
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource");

// Doing some transformations to map to POJO class.

Table kafkaTable = tableEnv.fromDataStream(
    kafkaSource,
    Schema.newBuilder()
        .columnByExpression("proc_time", "PROCTIME()")
        // eventTimestamp is in millis
        .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTimestamp, 3)")
        .watermark("event_time", "event_time - INTERVAL '20' SECOND")
        .build();

当使用 proc_time 时,Tumble_End window 查询 returns 行,但当我使用 [=18= 时 return 不查询任何内容].

SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)"

-- This query gives some results
SELECT TUMBLE_END(proc_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)"

我尝试设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,但由于我使用的是 1.14.4 稳定版,因此已弃用。

我也尝试添加自定义 WatermarkStrategy,但没有任何效果。我无法识别这种行为。有人可以帮忙吗?

David - 这是我正在使用的代码。

main() {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)

    val kafkaSource = KafkaSource.builder<String>()
        .setBootstrapServers("localhost:9092")
        .setTopics("an-topic")
        .setGroupId("testGroup")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(SimpleStringSchema())
        .build()

    val kafkaStream = env.fromSource(kafkaSource,
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource")

    val kafkaRowMapper = kafkaStream.map(RowMapper())

    val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
        Schema.newBuilder()
            .columnByExpression("proc_time", "PROCTIME()")
            .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
            .watermark("event_time", "event_time - INTERVAL '20' SECOND")
            .build()
    ).renameColumns(
        `$`("f0").`as`("eventId"),
        `$`("f1").`as`("eventName"),
        `$`("f3").`as`("eventValue")
    )
    tableEnv.createTemporaryView("finalTable", finalTable)

    val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " +
            "LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
            "GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)"
    val resultTable = tableEnv.sqlQuery(sqlQuery)
    tableEnv.toDataStream(resultTable).print()

    env.execute("TestJob")

}

class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
    override fun map(value: String): Tuple4<String, String, Long, Float> {
        val lineArray = value.split(",")

        return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat())
    }
}

Kafka 主题具有这样的值

event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32

我在创建 table 环境后添加了以下行,然后我可以使用 event_time

创建 windows
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")