Flink table 异常:Window 聚合只能在时间属性列上定义,但遇到 TIMESTAMP(6)
Flink table exception : Window aggregate can only be defined over a time attribute column, but TIMESTAMP(6) encountered
我使用的是 flink 1.12.0。尝试将数据流转换为 table A 和 运行 tableA 上的 sql 查询以聚合 window 为 below.I我使用 f2 列作为时间戳数据类型字段。
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
props.setProperty("schema.registry.url", xxx);
props.setProperty("group.id", "test");
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("client.id", "flink-kafka-example");
FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
"test-topic",
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Avrotest.class, prodSchemaRegistryURL),
props);
DataStreamSource<Avrotest> stream =
env.addSource(kafkaConsumer);
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
Table result =
tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
+ tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );
tEnv.toAppendStream(result,user.class).print();
env.execute("Flink kafka test");
}
当我执行上面的代码时,我得到
线程“main”中的异常org.apache.flink.table.api.TableException:Window聚合只能在时间属性列上定义,但遇到了 TIMESTAMP(6)。
在 org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50)
在 org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:81)
在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
在 org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
为了使用 table API 对数据流执行事件时间窗口化,您需要先分配时间戳和水印。您应该在调用 fromDataStream
.
之前执行此操作
对于 Kafka,通常最好直接在 FlinkKafkaConsumer
上调用 assignTimestampsAndWatermarks
。有关详细信息,请参阅 watermark docs, kafka connector docs, and Flink SQL docs。
3 个步骤:
- 先赋值
assignTimestampsAndWatermarks
你有几种类型的策略。
例如:
WatermarkStrategy<Row> customTime = WatermarkStrategy
.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
- 在您的源代码中分配您在第 1 步中声明的内容:
env.addSource().assignTimestampsAndWatermarks(customTime)
- 声明 table,并为时间戳字段设置行时间:
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());
我使用的是 flink 1.12.0。尝试将数据流转换为 table A 和 运行 tableA 上的 sql 查询以聚合 window 为 below.I我使用 f2 列作为时间戳数据类型字段。
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
props.setProperty("schema.registry.url", xxx);
props.setProperty("group.id", "test");
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("client.id", "flink-kafka-example");
FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
"test-topic",
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Avrotest.class, prodSchemaRegistryURL),
props);
DataStreamSource<Avrotest> stream =
env.addSource(kafkaConsumer);
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
Table result =
tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
+ tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );
tEnv.toAppendStream(result,user.class).print();
env.execute("Flink kafka test");
}
当我执行上面的代码时,我得到
线程“main”中的异常org.apache.flink.table.api.TableException:Window聚合只能在时间属性列上定义,但遇到了 TIMESTAMP(6)。 在 org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) 在 org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:81) 在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) 在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) 在 org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) 在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) 在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
为了使用 table API 对数据流执行事件时间窗口化,您需要先分配时间戳和水印。您应该在调用 fromDataStream
.
对于 Kafka,通常最好直接在 FlinkKafkaConsumer
上调用 assignTimestampsAndWatermarks
。有关详细信息,请参阅 watermark docs, kafka connector docs, and Flink SQL docs。
3 个步骤:
- 先赋值
assignTimestampsAndWatermarks
你有几种类型的策略。
例如:
WatermarkStrategy<Row> customTime = WatermarkStrategy
.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
- 在您的源代码中分配您在第 1 步中声明的内容:
env.addSource().assignTimestampsAndWatermarks(customTime)
- 声明 table,并为时间戳字段设置行时间:
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());