Flink:DataStream 上没有外连接?
Flink: no outer joins on DataStream?
我很惊讶地发现 Flink (DataStream docs) 中 DataStream
没有外连接。
对于 DataSet
,您有所有选项:leftOuterJoin
、rightOuterJoin
和 fullOuterJoin
,除了常规的 join
(DataSet docs ).但是对于 DataStream
你只有普通的旧连接。
这是由于 DataStream
的某些基本属性导致无法进行外部联接吗?或者也许我们可以在(关闭?)未来期待这一点?
我真的可以在 DataStream
上使用外部连接来解决我正在处理的问题...有什么方法可以实现类似的行为吗?
您可以使用 DataStream.coGroup()
转换实现外连接。 CoGroupFunction
接收两个迭代器(每个输入一个),它们服务于某个键的所有元素,如果没有找到匹配的元素,它可能为空。这允许实现外连接功能。
First-class 对外部连接的支持可能会添加到 Flink 的下一个版本中的 DataStream API 中。我目前不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。
一种方法是从流 -> table -> 流,使用以下 api:FLINK TABLE API - OUTER JOIN
这是一个 java 示例:
DataStream<String> data = env.readTextFile( ... );
DataStream<String> data2Merge = env.readTextFile( ... );
...
tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
Table tableLeft = tableEnv.sqlQuery(queryLeft);
Table tableRight = tableEnv.sqlQuery(queryRight);
Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
我很惊讶地发现 Flink (DataStream docs) 中 DataStream
没有外连接。
对于 DataSet
,您有所有选项:leftOuterJoin
、rightOuterJoin
和 fullOuterJoin
,除了常规的 join
(DataSet docs ).但是对于 DataStream
你只有普通的旧连接。
这是由于 DataStream
的某些基本属性导致无法进行外部联接吗?或者也许我们可以在(关闭?)未来期待这一点?
我真的可以在 DataStream
上使用外部连接来解决我正在处理的问题...有什么方法可以实现类似的行为吗?
您可以使用 DataStream.coGroup()
转换实现外连接。 CoGroupFunction
接收两个迭代器(每个输入一个),它们服务于某个键的所有元素,如果没有找到匹配的元素,它可能为空。这允许实现外连接功能。
First-class 对外部连接的支持可能会添加到 Flink 的下一个版本中的 DataStream API 中。我目前不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。
一种方法是从流 -> table -> 流,使用以下 api:FLINK TABLE API - OUTER JOIN
这是一个 java 示例:
DataStream<String> data = env.readTextFile( ... );
DataStream<String> data2Merge = env.readTextFile( ... );
...
tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
Table tableLeft = tableEnv.sqlQuery(queryLeft);
Table tableRight = tableEnv.sqlQuery(queryRight);
Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);