BEAM:ORACLE 连接器:NUMBER 数据类型:非法参数类型异常
BEAM :ORACLE connector: NUMBER Data type : illegal argument type exception
String sql = "select CUSTOMER_ID, CITY, CUSTOMER_FIRSTTNAME from LIMA.CHECK_FLOW where CUSTOMER_ID < 1000";
Schema schema = Schema.of(
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32),
Schema.Field.of("CITY", Schema.FieldType.STRING),
Schema.Field.of("CUSTOMER_FIRSTTNAME", Schema.FieldType.STRING)
);
PCollection<Row> result = pipe.apply(JdbcIO.readRows()
.withDataSourceConfiguration(configuration)
.withQuery(sql)
.withFetchSize(1000)
).setCoder(RowCoder.of(schema)).setRowSchema(schema);
PipelineResult nRes = pipe.run();
我们正在测试 Beams 以查看它是否适合我们的用例。上面是一个简单的例子。我收到以下错误
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:268)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:246)
at org.apache.beam.sdk.io.jdbc.SchemaUtil.lambda$createLogicalTypeExtractor$ca0ab2ec(SchemaUtil.java:279)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:344)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:322)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:924)
任何解决错误的帮助将不胜感激。
我遇到了同样的问题,并且已作为 this pull request 的一部分得到解决。
该版本将于 2022 年 1 月左右发布。目前,您可以通过创建 theorg/apache/beam/sdk/io/jdbc/LogicalTypes.java class 并复制这个file中的内容?我有同样的建议。
String sql = "select CUSTOMER_ID, CITY, CUSTOMER_FIRSTTNAME from LIMA.CHECK_FLOW where CUSTOMER_ID < 1000";
Schema schema = Schema.of(
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32),
Schema.Field.of("CITY", Schema.FieldType.STRING),
Schema.Field.of("CUSTOMER_FIRSTTNAME", Schema.FieldType.STRING)
);
PCollection<Row> result = pipe.apply(JdbcIO.readRows()
.withDataSourceConfiguration(configuration)
.withQuery(sql)
.withFetchSize(1000)
).setCoder(RowCoder.of(schema)).setRowSchema(schema);
PipelineResult nRes = pipe.run();
我们正在测试 Beams 以查看它是否适合我们的用例。上面是一个简单的例子。我收到以下错误
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:268)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:246)
at org.apache.beam.sdk.io.jdbc.SchemaUtil.lambda$createLogicalTypeExtractor$ca0ab2ec(SchemaUtil.java:279)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:344)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:322)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:924)
任何解决错误的帮助将不胜感激。
我遇到了同样的问题,并且已作为 this pull request 的一部分得到解决。
该版本将于 2022 年 1 月左右发布。目前,您可以通过创建 theorg/apache/beam/sdk/io/jdbc/LogicalTypes.java class 并复制这个file中的内容?我有同样的建议。