Flink:DataStream 上没有外连接?

Flink: no outer joins on DataStream?

我很惊讶地发现 Flink (DataStream docs) 中 DataStream 没有外连接。

对于 DataSet,您有所有选项:leftOuterJoinrightOuterJoinfullOuterJoin,除了常规的 joinDataSet docs ).但是对于 DataStream 你只有普通的旧连接。

这是由于 DataStream 的某些基本属性导致无法进行外部联接吗?或者也许我们可以在(关闭?)未来期待这一点?

我真的可以在 DataStream 上使用外部连接来解决我正在处理的问题...有什么方法可以实现类似的行为吗?

您可以使用 DataStream.coGroup() 转换实现外连接。 CoGroupFunction 接收两个迭代器(每个输入一个),它们服务于某个键的所有元素,如果没有找到匹配的元素,它可能为空。这允许实现外连接功能。

First-class 对外部连接的支持可能会添加到 Flink 的下一个版本中的 DataStream API 中。我目前不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。

一种方法是从流 -> table -> 流,使用以下 api:FLINK TABLE API - OUTER JOIN

这是一个 java 示例:

    DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);