Flink SQL:在 GROUP BY 查询的结果中重复分组键
Flink SQL: Repeating grouping keys in result of GROUP BY query
我想在 Flink SQL 中做一个简单的查询 table 其中包括一个 group by 语句。但是在结果中,group by 语句中指定的列有重复的行。是不是因为我用的是streaming环境,它不记得状态了?
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID
FlinkKafkaConsumer011<BlocksTransactions> flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();
DataStream<BlocksTransactions> blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);
tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);
Table sqlResult
= tableEnv.sqlQuery(
"SELECT block_hash, count(tx_hash) " +
"FROM blocksTransactionsTable " +
"GROUP BY block_hash");
DataStream<Test> resultStream = tableEnv
.toRetractStream(sqlResult, Row.class)
.map(t -> {
Row r = t.f1;
String field2 = r.getField(0).toString();
long count = Long.valueOf(r.getField(1).toString());
return new Test(field2, count);
})
.returns(Test.class);
resultStream.print();
resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));
env.execute();
我对 block_hash 列使用 group by 语句,但我有好几次相同的 block_hash。这是 print() 的结果:
Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1}
Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1}
Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=2}
Test{field2='0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60', count=1}
Test{field2='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count=1}
Test{field2='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count=1}
如何在不使用 BatchEnvironment 的情况下解决这个问题?
在流上运行的 GROUP BY
查询必须产生更新。考虑以下示例:
SELECT user, COUNT(*) FROM clicks GROUP BY user;
每次 clicks
table 收到一个新行,相应 user
的计数需要递增和更新。
当您将 Table
转换为 DataStream
时,这些更新必须在流中编码。 Flink 使用撤回和添加消息来做到这一点。通过调用 tEnv.toRetractStream(table, Row.class)
,您将 Table
table
转换为 DataStream<Tuple2<Boolean, Row>
。 Boolean
标志很重要,表示 Row
是添加还是从结果 table.
中撤回
给定上面的示例查询和输入 table clicks
as
user | ...
------------
Bob | ...
Liz | ...
Bob | ...
您将收到以下撤回流
(+, (Bob, 1)) // add first result for Bob
(+, (Liz, 1)) // add first result for Liz
(-, (Bob, 1)) // remove outdated result for Bob
(+, (Bob, 2)) // add updated result for Bob
您需要自己主动维护结果,并按照撤回流的 Boolean
标志的指示添加和删除行。
我想在 Flink SQL 中做一个简单的查询 table 其中包括一个 group by 语句。但是在结果中,group by 语句中指定的列有重复的行。是不是因为我用的是streaming环境,它不记得状态了?
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID
FlinkKafkaConsumer011<BlocksTransactions> flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();
DataStream<BlocksTransactions> blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);
tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);
Table sqlResult
= tableEnv.sqlQuery(
"SELECT block_hash, count(tx_hash) " +
"FROM blocksTransactionsTable " +
"GROUP BY block_hash");
DataStream<Test> resultStream = tableEnv
.toRetractStream(sqlResult, Row.class)
.map(t -> {
Row r = t.f1;
String field2 = r.getField(0).toString();
long count = Long.valueOf(r.getField(1).toString());
return new Test(field2, count);
})
.returns(Test.class);
resultStream.print();
resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));
env.execute();
我对 block_hash 列使用 group by 语句,但我有好几次相同的 block_hash。这是 print() 的结果:
Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=2} Test{field2='0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60', count=1} Test{field2='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count=1} Test{field2='0x182d31bd491527e1e93c4e44686057207ee90c6a8428308a2bd7b6a4d2e10e53', count=1}
如何在不使用 BatchEnvironment 的情况下解决这个问题?
在流上运行的 GROUP BY
查询必须产生更新。考虑以下示例:
SELECT user, COUNT(*) FROM clicks GROUP BY user;
每次 clicks
table 收到一个新行,相应 user
的计数需要递增和更新。
当您将 Table
转换为 DataStream
时,这些更新必须在流中编码。 Flink 使用撤回和添加消息来做到这一点。通过调用 tEnv.toRetractStream(table, Row.class)
,您将 Table
table
转换为 DataStream<Tuple2<Boolean, Row>
。 Boolean
标志很重要,表示 Row
是添加还是从结果 table.
给定上面的示例查询和输入 table clicks
as
user | ...
------------
Bob | ...
Liz | ...
Bob | ...
您将收到以下撤回流
(+, (Bob, 1)) // add first result for Bob
(+, (Liz, 1)) // add first result for Liz
(-, (Bob, 1)) // remove outdated result for Bob
(+, (Bob, 2)) // add updated result for Bob
您需要自己主动维护结果,并按照撤回流的 Boolean
标志的指示添加和删除行。