Apache Flink 中是否有相当于 Kafka 的 KTable?

Is there an equivalent to Kafka's KTable in Apache Flink?

Apache Kafka has a concept of a KTable,其中

where each data record represents an update

本质上,我可以使用一个kafka主题,并且只保留每个键的最新消息。

Apache Flink 中是否有类似的概念?我读过 Flink's Table API,但似乎没有解决同样的问题。

比较和对比这两个框架的一些帮助会有所帮助。我不是在寻找哪个更好或更坏。而是它们的不同之处。那么正确的答案将取决于我的要求。

你是对的。 Flink的TableAPI及其Tableclass不对应Kafka的KTable。 Table API 是一种嵌入关系语言的 API(想想 SQL 集成在 Java 和 Scala 中)。

Flink 的DataStream API 没有内置一个KTable 对应的概念。相反,Flink 提供了复杂的状态管理,并且 KTable 将是 keyed state 的常规运算符。

例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值并将其与第二个输入的值连接,可以使用 CoFlatMapFunction 实现如下:

DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...

DataStream<Tuple2<String, String>> result = first
  // connect first and second stream
  .connect(second)
  // key both streams on the first (Long) attribute
  .keyBy(0, 0)
  // join them
  .flatMap(new TableLookup());

// ------

public static class TableLookup 
  extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {

  // keyed state
  private ValueState<String> lastVal;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<String> valueDesc = 
      new ValueStateDescriptor<String>("table", Types.STRING);
    lastVal = getRuntimeContext().getState(valueDesc);
  }

  @Override
  public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
    // update the value for the current Long key with the String value.
    lastVal.update(value.f1);
  }

  @Override
  public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
    // look up latest String for current Long key.
    String lookup = lastVal.value();
    // emit current String and looked-up String
    out.collect(Tuple2.of(value.f1, lookup));
  }
}

总的来说,状态可以非常灵活地与 Flink 一起使用,让您实现广泛的用例。还有更多的状态类型,例如 ListState and MapState and with a ProcessFunction 你可以对时间进行细粒度的控制,例如如果某个键在一定时间内没有更新,则删除它的状态(KTable据我所知有一个配置)。