使用Flink从oracle中读取数据
Reading data from oracle using Flink
我正在尝试使用 Flink 与 Oracle 一起工作。只需执行一个简单的任务,将数据从 table 复制到新数据。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE ExistedTable(\n" +
" quoteid BIGINT,\n" +
" requestid BIGINT,\n" +
" createddt DATE,\n" +
" PRIMARY KEY (quoteid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
" 'table-name' = 'TableName',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'username' = 'UserName',\n" +
" 'password' = 'Password'\n" +
")");
tEnv.executeSql("CREATE TABLE NewTable (\n" +
" quoteid BIGINT,\n" +
" requestid BIGINT,\n" +
" createddt DATE,\n" +
" PRIMARY KEY (quoteid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
" 'table-name' = 'NewTableName',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'username' = 'UserName',\n" +
" 'password' = 'Password'\n" +
")");
Table data= tEnv.from("ExistedTable");
data.executeInsert("NewTable");
当运行我有错误
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
Table options are:
'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
我的sqlconnection有没有错误。我找不到任何使用 oracle 的示例。
谢谢,
您使用的是哪个版本的 Flink?对 Oracle JDBC 的支持从 Flink 1.15 开始可用,但尚未发布。
我正在尝试使用 Flink 与 Oracle 一起工作。只需执行一个简单的任务,将数据从 table 复制到新数据。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE ExistedTable(\n" +
" quoteid BIGINT,\n" +
" requestid BIGINT,\n" +
" createddt DATE,\n" +
" PRIMARY KEY (quoteid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
" 'table-name' = 'TableName',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'username' = 'UserName',\n" +
" 'password' = 'Password'\n" +
")");
tEnv.executeSql("CREATE TABLE NewTable (\n" +
" quoteid BIGINT,\n" +
" requestid BIGINT,\n" +
" createddt DATE,\n" +
" PRIMARY KEY (quoteid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n" +
" 'table-name' = 'NewTableName',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'username' = 'UserName',\n" +
" 'password' = 'Password'\n" +
")");
Table data= tEnv.from("ExistedTable");
data.executeInsert("NewTable");
当运行我有错误
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
Table options are:
'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
我的sqlconnection有没有错误。我找不到任何使用 oracle 的示例。 谢谢,
您使用的是哪个版本的 Flink?对 Oracle JDBC 的支持从 Flink 1.15 开始可用,但尚未发布。