Flink:DataStream 左连接 Table。超级简单
Flink: DataStream left join Table. Super simple
DataStream<String> sourceStream = streamEnv.fromElements("key_a", "key_b", "key_c", "key_d");
Table lookupTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("my_key", DataTypes.STRING()),
DataTypes.FIELD("my_value", DataTypes.STRING())
),
Expressions.row("key_a", "value_a"),
Expressions.row("key_b", "value_b")
);
我想离开加入流到 table。
这显然是一个简化的演示场景。在使用更大的生产数据集之前,我想了解如何使用 Flink API 通过玩具数据集实现此目的。
Table 连接上的文档展示了如何连接两个 table 并返回另一个 table,这不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins
关于 DataStream 连接的文档显示一次连接两个流 window,这也不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
我相信这就是您要找的。此示例将 sourceStream 转换为动态 table,将其与查找 table 连接,然后将生成的动态 table 转换回流以供打印。
您可以使用 DataStream API.
对 resultStream 进行进一步处理
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class JoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<String> sourceStream = env.fromElements("key_a", "key_b", "key_c", "key_d");
Table streamTable = tableEnv.fromDataStream(sourceStream, $("stream_key"));
Table lookupTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("lookup_key", DataTypes.STRING()),
DataTypes.FIELD("lookup_value", DataTypes.STRING())
),
Expressions.row("key_a", "value_a"),
Expressions.row("key_b", "value_b")
);
Table resultTable = streamTable
.join(lookupTable).where($("stream_key").isEqual($("lookup_key")))
.select($("stream_key"), $("lookup_value"));
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
resultStream.print();
env.execute();
}
}
输出为
key_b,value_b
key_a,value_a
DataStream<String> sourceStream = streamEnv.fromElements("key_a", "key_b", "key_c", "key_d");
Table lookupTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("my_key", DataTypes.STRING()),
DataTypes.FIELD("my_value", DataTypes.STRING())
),
Expressions.row("key_a", "value_a"),
Expressions.row("key_b", "value_b")
);
我想离开加入流到 table。
这显然是一个简化的演示场景。在使用更大的生产数据集之前,我想了解如何使用 Flink API 通过玩具数据集实现此目的。
Table 连接上的文档展示了如何连接两个 table 并返回另一个 table,这不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins
关于 DataStream 连接的文档显示一次连接两个流 window,这也不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
我相信这就是您要找的。此示例将 sourceStream 转换为动态 table,将其与查找 table 连接,然后将生成的动态 table 转换回流以供打印。
您可以使用 DataStream API.
对 resultStream 进行进一步处理import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class JoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<String> sourceStream = env.fromElements("key_a", "key_b", "key_c", "key_d");
Table streamTable = tableEnv.fromDataStream(sourceStream, $("stream_key"));
Table lookupTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("lookup_key", DataTypes.STRING()),
DataTypes.FIELD("lookup_value", DataTypes.STRING())
),
Expressions.row("key_a", "value_a"),
Expressions.row("key_b", "value_b")
);
Table resultTable = streamTable
.join(lookupTable).where($("stream_key").isEqual($("lookup_key")))
.select($("stream_key"), $("lookup_value"));
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
resultStream.print();
env.execute();
}
}
输出为
key_b,value_b
key_a,value_a