如何从 Mysql 数据库创建 DataStreamSource?

How to create a DataStreamSource from a Mysql Database?

我有一个问题 运行 一个 flink 作业基本上是 运行 对 mysql 数据库的查询,然后尝试创建一个必须从不同的数据库访问的临时视图工作。

public static void main(String[] args) throws Exception {
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    final TypeInformation<?>[] fieldTypes =
        new TypeInformation<?>[] {
          BasicTypeInfo.INT_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO
        };

    final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

    String selectQuery = "select * from ***";
    String driverName = "***";
    String sourceDb = "***";
    String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
    String dbPassword = "***";
    String dbUser = "***";

    JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
        JdbcInputFormat.buildJdbcInputFormat()
            .setDrivername(driverName)
            .setDBUrl(dbUrl + sourceDb)
            .setQuery(selectQuery)
            .setRowTypeInfo(rowTypeInfo)
            .setUsername(dbUser)
            .setPassword(dbPassword);

    DataStreamSource<Row> source = env.createInput(inputBuilder.finish());

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    Table customerTable =
            tableEnv.fromDataStream(source).as("id", "name", "test");

    tableEnv.createTemporaryView("***", ***Table);
    Table resultTable = tableEnv.sqlQuery(
            "SELECT * FROM ***");

    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

    resultStream.print();
    env.execute();

我是 Flink 的新手,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我唯一打印出来的是这样的:

2022-02-14 12:22:57,702 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.

这项工作的重点是创建一个临时 table 视图,用于缓存一些静态数据,这些数据将通过查询该 table 视图用于其他 Flink 作业。

首先,测试mysql的数据是否可以正常读取 也许你可以直接打印源结果如下

DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
source.print()
env.execute();

有关如何将 MySQL 与 Flink 一起使用的更多上下文,请参阅 。作为流式数据源,使用 MySQL 的 write-ahead-log 作为 CDC 流更常见,但有时采用的另一种方法(但 Flink 的 API 不鼓励)是定期轮询 MySQL 使用 SELECT 查询。

至于您尝试过的方法,不鼓励对流作业使用 createInput,因为这不适用于 Flink 的检查点机制。与其使用 hadoop 输入格式,不如选择一种可用的源连接器。

临时视图不包含任何数据,也不是可以从其他作业访问的内容。 Flink table 或视图是描述存储在其他地方(例如 mysql 或 kafka)的数据如何被 Flink 解释为 table 的元数据。您可以将视图存储在目录中,以便多个作业可以共享其定义,但底层数据将保留在外部数据存储中,只有视图元数据存储在目录中。

所以在这种情况下,您编写的作业将创建一个临时视图,该视图仅对该作业可见,对其他作业不可见(因为它是临时视图,而不是存储在持久目录中的持久视图) .您的作业的输出不会在日志文件中,而是转到标准输出,或每个任务管理器的日志目录中的 *.out 文件。