使用具有可变元组大小的 JDBCInputFormat (apache-flink)
Using JDBCInputFormat with variable tuple size (apache-flink)
我想在 java 中编写一个通用的 flink 作业,它可以接受任何 SQL-SELECT 查询,运行 它针对 SQL -数据库并将其写入 Elasticsearch 索引。
我必须解决的问题之一是为 JDBC-Connection 创建数据源。我想使用 JDBCInputFormat. I followed the example in the documentation data source.
问题是,必须指定通用类型 DataSource
类型。而且我只能使用 Tuple
类型,因为 JDBCInputFormat
通用类型 OUT
扩展了 Tuple
。但是我在编译时不知道我将使用哪个Tuple
。
- 我是不是理解有误?
- 还有其他jdbc
InputFormat
我可以用吗?
- 有没有办法将
Tuple
指定为通用类型?
我用的是java7和apache-flink 0.10.2
我尝试使用 Tuple25
其中只有字符串,但出现异常。
下面是代码,然后是异常。
DataSource<StringsTuple25> database = flink.createInput(
JDBCInputFormat.buildJDBCInputFormat()//
.setDrivername(getDatabaseDriverName())//
.setDBUrl(getDatabaseUrl())//
.setUsername(getDatabaseUsername())//
.setPassword(getDatabasePassword())//
.setQuery(getQuery())//
.finish(),
StringsTuple25.typeInformation()
);
我的StringTuple25
class
public class StringsTuple25 extends
Tuple25<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String> {
private static final long serialVersionUID = 1L;
public static TypeInformation<?> typeInformation() {
TypeInformation<String>[] types = new TypeInformation[25];
Arrays.fill(types, STRING_TYPE_INFO);
return new TupleTypeInfo<>(Tuple25.class,types);
}
}
我得到这个例外:
Caused by: java.io.IOException: Tuple size does not match columncount
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.extractTypes(JDBCInputFormat.java:180)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:162)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
如错误所示,您使用的 Tuple
类型的属性数必须与您在 SQL 查询中选择的列数相匹配。此外,每个属性的数据类型必须匹配。
例如,如果您 SELECT id, name FROM ...
和 id
是 INTEGER
并且 name
是 VARCHAR
,您将指定使用 DataStream<Tuple2<Integer,String>>
(或专门化您自己的 class class MyResultType extends Tuple2<Integer,String>
和 DataStream<MyResultType>
) 并提供相应的 TypeInformation
.
您也可以使用通用 Tuple
类型。您的流将是 DataStream<Tuple>
(未指定属性的数量或类型)。但是,对于 TypeInformation
,您 需要知道属性的数量 。
Tuple t = Tuple.getTupleClass(numberOfAttributes).newInstance();
for(int i = 0; i < numberOfAttributes; i++) {
t.setField("", i);
}
TypeInformation<Tuple> typeInfo = TypeExtractor.getForObject(t);
因此,您需要根据定义 SQL 查询的给定参数推断所选属性的数量。
我想在 java 中编写一个通用的 flink 作业,它可以接受任何 SQL-SELECT 查询,运行 它针对 SQL -数据库并将其写入 Elasticsearch 索引。
我必须解决的问题之一是为 JDBC-Connection 创建数据源。我想使用 JDBCInputFormat. I followed the example in the documentation data source.
问题是,必须指定通用类型 DataSource
类型。而且我只能使用 Tuple
类型,因为 JDBCInputFormat
通用类型 OUT
扩展了 Tuple
。但是我在编译时不知道我将使用哪个Tuple
。
- 我是不是理解有误?
- 还有其他jdbc
InputFormat
我可以用吗? - 有没有办法将
Tuple
指定为通用类型?
我用的是java7和apache-flink 0.10.2
我尝试使用 Tuple25
其中只有字符串,但出现异常。
下面是代码,然后是异常。
DataSource<StringsTuple25> database = flink.createInput(
JDBCInputFormat.buildJDBCInputFormat()//
.setDrivername(getDatabaseDriverName())//
.setDBUrl(getDatabaseUrl())//
.setUsername(getDatabaseUsername())//
.setPassword(getDatabasePassword())//
.setQuery(getQuery())//
.finish(),
StringsTuple25.typeInformation()
);
我的StringTuple25
class
public class StringsTuple25 extends
Tuple25<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String> {
private static final long serialVersionUID = 1L;
public static TypeInformation<?> typeInformation() {
TypeInformation<String>[] types = new TypeInformation[25];
Arrays.fill(types, STRING_TYPE_INFO);
return new TupleTypeInfo<>(Tuple25.class,types);
}
}
我得到这个例外:
Caused by: java.io.IOException: Tuple size does not match columncount
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.extractTypes(JDBCInputFormat.java:180)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:162)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
如错误所示,您使用的 Tuple
类型的属性数必须与您在 SQL 查询中选择的列数相匹配。此外,每个属性的数据类型必须匹配。
例如,如果您 SELECT id, name FROM ...
和 id
是 INTEGER
并且 name
是 VARCHAR
,您将指定使用 DataStream<Tuple2<Integer,String>>
(或专门化您自己的 class class MyResultType extends Tuple2<Integer,String>
和 DataStream<MyResultType>
) 并提供相应的 TypeInformation
.
您也可以使用通用 Tuple
类型。您的流将是 DataStream<Tuple>
(未指定属性的数量或类型)。但是,对于 TypeInformation
,您 需要知道属性的数量 。
Tuple t = Tuple.getTupleClass(numberOfAttributes).newInstance();
for(int i = 0; i < numberOfAttributes; i++) {
t.setField("", i);
}
TypeInformation<Tuple> typeInfo = TypeExtractor.getForObject(t);
因此,您需要根据定义 SQL 查询的给定参数推断所选属性的数量。