Apache Flink 中是否有相当于 Kafka 的 KTable?
Is there an equivalent to Kafka's KTable in Apache Flink?
Apache Kafka has a concept of a KTable,其中
where each data record represents an update
本质上,我可以使用一个kafka主题,并且只保留每个键的最新消息。
Apache Flink 中是否有类似的概念?我读过 Flink's Table API,但似乎没有解决同样的问题。
比较和对比这两个框架的一些帮助会有所帮助。我不是在寻找哪个更好或更坏。而是它们的不同之处。那么正确的答案将取决于我的要求。
你是对的。 Flink的TableAPI及其Table
class不对应Kafka的KTable。 Table API 是一种嵌入关系语言的 API(想想 SQL 集成在 Java 和 Scala 中)。
Flink 的DataStream API 没有内置一个KTable 对应的概念。相反,Flink 提供了复杂的状态管理,并且 KTable 将是 keyed state 的常规运算符。
例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值并将其与第二个输入的值连接,可以使用 CoFlatMapFunction
实现如下:
DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...
DataStream<Tuple2<String, String>> result = first
// connect first and second stream
.connect(second)
// key both streams on the first (Long) attribute
.keyBy(0, 0)
// join them
.flatMap(new TableLookup());
// ------
public static class TableLookup
extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {
// keyed state
private ValueState<String> lastVal;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<String> valueDesc =
new ValueStateDescriptor<String>("table", Types.STRING);
lastVal = getRuntimeContext().getState(valueDesc);
}
@Override
public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// update the value for the current Long key with the String value.
lastVal.update(value.f1);
}
@Override
public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// look up latest String for current Long key.
String lookup = lastVal.value();
// emit current String and looked-up String
out.collect(Tuple2.of(value.f1, lookup));
}
}
总的来说,状态可以非常灵活地与 Flink 一起使用,让您实现广泛的用例。还有更多的状态类型,例如 ListState
and MapState
and with a ProcessFunction
你可以对时间进行细粒度的控制,例如如果某个键在一定时间内没有更新,则删除它的状态(KTable据我所知有一个配置)。
Apache Kafka has a concept of a KTable,其中
where each data record represents an update
本质上,我可以使用一个kafka主题,并且只保留每个键的最新消息。
Apache Flink 中是否有类似的概念?我读过 Flink's Table API,但似乎没有解决同样的问题。
比较和对比这两个框架的一些帮助会有所帮助。我不是在寻找哪个更好或更坏。而是它们的不同之处。那么正确的答案将取决于我的要求。
你是对的。 Flink的TableAPI及其Table
class不对应Kafka的KTable。 Table API 是一种嵌入关系语言的 API(想想 SQL 集成在 Java 和 Scala 中)。
Flink 的DataStream API 没有内置一个KTable 对应的概念。相反,Flink 提供了复杂的状态管理,并且 KTable 将是 keyed state 的常规运算符。
例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值并将其与第二个输入的值连接,可以使用 CoFlatMapFunction
实现如下:
DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...
DataStream<Tuple2<String, String>> result = first
// connect first and second stream
.connect(second)
// key both streams on the first (Long) attribute
.keyBy(0, 0)
// join them
.flatMap(new TableLookup());
// ------
public static class TableLookup
extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {
// keyed state
private ValueState<String> lastVal;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<String> valueDesc =
new ValueStateDescriptor<String>("table", Types.STRING);
lastVal = getRuntimeContext().getState(valueDesc);
}
@Override
public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// update the value for the current Long key with the String value.
lastVal.update(value.f1);
}
@Override
public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// look up latest String for current Long key.
String lookup = lastVal.value();
// emit current String and looked-up String
out.collect(Tuple2.of(value.f1, lookup));
}
}
总的来说,状态可以非常灵活地与 Flink 一起使用,让您实现广泛的用例。还有更多的状态类型,例如 ListState
and MapState
and with a ProcessFunction
你可以对时间进行细粒度的控制,例如如果某个键在一定时间内没有更新,则删除它的状态(KTable据我所知有一个配置)。