如何从 Mysql 数据库创建 DataStreamSource?
How to create a DataStreamSource from a Mysql Database?
我有一个问题 运行 一个 flink 作业基本上是 运行 对 mysql 数据库的查询,然后尝试创建一个必须从不同的数据库访问的临时视图工作。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final TypeInformation<?>[] fieldTypes =
new TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
String selectQuery = "select * from ***";
String driverName = "***";
String sourceDb = "***";
String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
String dbPassword = "***";
String dbUser = "***";
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(driverName)
.setDBUrl(dbUrl + sourceDb)
.setQuery(selectQuery)
.setRowTypeInfo(rowTypeInfo)
.setUsername(dbUser)
.setPassword(dbPassword);
DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table customerTable =
tableEnv.fromDataStream(source).as("id", "name", "test");
tableEnv.createTemporaryView("***", ***Table);
Table resultTable = tableEnv.sqlQuery(
"SELECT * FROM ***");
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.print();
env.execute();
我是 Flink 的新手,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我唯一打印出来的是这样的:
2022-02-14 12:22:57,702 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.
这项工作的重点是创建一个临时 table 视图,用于缓存一些静态数据,这些数据将通过查询该 table 视图用于其他 Flink 作业。
首先,测试mysql的数据是否可以正常读取
也许你可以直接打印源结果如下
DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
source.print()
env.execute();
有关如何将 MySQL 与 Flink 一起使用的更多上下文,请参阅 。作为流式数据源,使用 MySQL 的 write-ahead-log 作为 CDC 流更常见,但有时采用的另一种方法(但 Flink 的 API 不鼓励)是定期轮询 MySQL 使用 SELECT 查询。
至于您尝试过的方法,不鼓励对流作业使用 createInput
,因为这不适用于 Flink 的检查点机制。与其使用 hadoop 输入格式,不如选择一种可用的源连接器。
临时视图不包含任何数据,也不是可以从其他作业访问的内容。 Flink table 或视图是描述存储在其他地方(例如 mysql 或 kafka)的数据如何被 Flink 解释为 table 的元数据。您可以将视图存储在目录中,以便多个作业可以共享其定义,但底层数据将保留在外部数据存储中,只有视图元数据存储在目录中。
所以在这种情况下,您编写的作业将创建一个临时视图,该视图仅对该作业可见,对其他作业不可见(因为它是临时视图,而不是存储在持久目录中的持久视图) .您的作业的输出不会在日志文件中,而是转到标准输出,或每个任务管理器的日志目录中的 *.out
文件。
我有一个问题 运行 一个 flink 作业基本上是 运行 对 mysql 数据库的查询,然后尝试创建一个必须从不同的数据库访问的临时视图工作。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final TypeInformation<?>[] fieldTypes =
new TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
String selectQuery = "select * from ***";
String driverName = "***";
String sourceDb = "***";
String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
String dbPassword = "***";
String dbUser = "***";
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(driverName)
.setDBUrl(dbUrl + sourceDb)
.setQuery(selectQuery)
.setRowTypeInfo(rowTypeInfo)
.setUsername(dbUser)
.setPassword(dbPassword);
DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table customerTable =
tableEnv.fromDataStream(source).as("id", "name", "test");
tableEnv.createTemporaryView("***", ***Table);
Table resultTable = tableEnv.sqlQuery(
"SELECT * FROM ***");
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.print();
env.execute();
我是 Flink 的新手,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我唯一打印出来的是这样的:
2022-02-14 12:22:57,702 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.
这项工作的重点是创建一个临时 table 视图,用于缓存一些静态数据,这些数据将通过查询该 table 视图用于其他 Flink 作业。
首先,测试mysql的数据是否可以正常读取 也许你可以直接打印源结果如下
DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
source.print()
env.execute();
有关如何将 MySQL 与 Flink 一起使用的更多上下文,请参阅
至于您尝试过的方法,不鼓励对流作业使用 createInput
,因为这不适用于 Flink 的检查点机制。与其使用 hadoop 输入格式,不如选择一种可用的源连接器。
临时视图不包含任何数据,也不是可以从其他作业访问的内容。 Flink table 或视图是描述存储在其他地方(例如 mysql 或 kafka)的数据如何被 Flink 解释为 table 的元数据。您可以将视图存储在目录中,以便多个作业可以共享其定义,但底层数据将保留在外部数据存储中,只有视图元数据存储在目录中。
所以在这种情况下,您编写的作业将创建一个临时视图,该视图仅对该作业可见,对其他作业不可见(因为它是临时视图,而不是存储在持久目录中的持久视图) .您的作业的输出不会在日志文件中,而是转到标准输出,或每个任务管理器的日志目录中的 *.out
文件。