使用Flink从oracle中读取数据

Reading data from oracle using Flink

我正在尝试使用 Flink 与 Oracle 一起工作。只需执行一个简单的任务,将数据从 table 复制到新数据。

 EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE ExistedTable(\n" +
                "    quoteid  BIGINT,\n" +
                "    requestid      BIGINT,\n" +
                "    createddt DATE,\n" +           
                "    PRIMARY KEY (quoteid) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
                "    'table-name'    = 'TableName',\n" +
                "   'driver'     = 'oracle.jdbc.driver.OracleDriver',\n" +
                "    'username'    = 'UserName',\n" +
                "    'password'    = 'Password'\n" +
                ")");

        tEnv.executeSql("CREATE TABLE NewTable (\n" +
                "    quoteid  BIGINT,\n" +
                "    requestid      BIGINT,\n" +
                "    createddt DATE,\n" +           
                "    PRIMARY KEY (quoteid) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
                "    'table-name'    = 'NewTableName',\n" +
                "   'driver'     = 'oracle.jdbc.driver.OracleDriver',\n" +
                "    'username'    = 'UserName',\n" +
                "    'password'    = 'Password'\n" +
                ")");

        Table data= tEnv.from("ExistedTable");
        data.executeInsert("NewTable");

当运行我有错误

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.

Table options are:

'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.

我的sqlconnection有没有错误。我找不到任何使用 oracle 的示例。 谢谢,

您使用的是哪个版本的 Flink?对 Oracle JDBC 的支持从 Flink 1.15 开始可用,但尚未发布。