Flink - SQL Tumble End on event time 不返回任何结果
Flink - SQL Tumble End on event time not returning any result
我有一个 Flink 作业,它使用 kafka 主题并尝试根据 eventId 和 eventName 等少数列创建 windows。 Kafka 主题有 eventTimestamp
作为时间戳字段,时间戳以毫秒为单位
DataStreamSource kafkaStream = env.fromSource(
kafkaSource, //kafkaSource is the KafkaSource builder
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource");
// Doing some transformations to map to POJO class.
Table kafkaTable = tableEnv.fromDataStream(
kafkaSource,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
// eventTimestamp is in millis
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTimestamp, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build();
当使用 proc_time
时,Tumble_End
window 查询 returns 行,但当我使用 [=18= 时 return 不查询任何内容].
SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)"
-- This query gives some results
SELECT TUMBLE_END(proc_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)"
我尝试设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
,但由于我使用的是 1.14.4
稳定版,因此已弃用。
我也尝试添加自定义 WatermarkStrategy,但没有任何效果。我无法识别这种行为。有人可以帮忙吗?
David - 这是我正在使用的代码。
main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("localhost:9092")
.setTopics("an-topic")
.setGroupId("testGroup")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()
val kafkaStream = env.fromSource(kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource")
val kafkaRowMapper = kafkaStream.map(RowMapper())
val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
).renameColumns(
`$`("f0").`as`("eventId"),
`$`("f1").`as`("eventName"),
`$`("f3").`as`("eventValue")
)
tableEnv.createTemporaryView("finalTable", finalTable)
val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " +
"LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
"GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)"
val resultTable = tableEnv.sqlQuery(sqlQuery)
tableEnv.toDataStream(resultTable).print()
env.execute("TestJob")
}
class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
override fun map(value: String): Tuple4<String, String, Long, Float> {
val lineArray = value.split(",")
return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat())
}
}
Kafka 主题具有这样的值
event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32
我在创建 table 环境后添加了以下行,然后我可以使用 event_time
创建 windows
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
我有一个 Flink 作业,它使用 kafka 主题并尝试根据 eventId 和 eventName 等少数列创建 windows。 Kafka 主题有 eventTimestamp
作为时间戳字段,时间戳以毫秒为单位
DataStreamSource kafkaStream = env.fromSource(
kafkaSource, //kafkaSource is the KafkaSource builder
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource");
// Doing some transformations to map to POJO class.
Table kafkaTable = tableEnv.fromDataStream(
kafkaSource,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
// eventTimestamp is in millis
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTimestamp, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build();
当使用 proc_time
时,Tumble_End
window 查询 returns 行,但当我使用 [=18= 时 return 不查询任何内容].
SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)"
-- This query gives some results
SELECT TUMBLE_END(proc_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)"
我尝试设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
,但由于我使用的是 1.14.4
稳定版,因此已弃用。
我也尝试添加自定义 WatermarkStrategy,但没有任何效果。我无法识别这种行为。有人可以帮忙吗?
David - 这是我正在使用的代码。
main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("localhost:9092")
.setTopics("an-topic")
.setGroupId("testGroup")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()
val kafkaStream = env.fromSource(kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource")
val kafkaRowMapper = kafkaStream.map(RowMapper())
val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
).renameColumns(
`$`("f0").`as`("eventId"),
`$`("f1").`as`("eventName"),
`$`("f3").`as`("eventValue")
)
tableEnv.createTemporaryView("finalTable", finalTable)
val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " +
"LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
"GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)"
val resultTable = tableEnv.sqlQuery(sqlQuery)
tableEnv.toDataStream(resultTable).print()
env.execute("TestJob")
}
class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
override fun map(value: String): Tuple4<String, String, Long, Float> {
val lineArray = value.split(",")
return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat())
}
}
Kafka 主题具有这样的值
event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32
我在创建 table 环境后添加了以下行,然后我可以使用 event_time
创建 windowstableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")