使用具有可变元组大小的 JDBCInputFormat (apache-flink)

Using JDBCInputFormat with variable tuple size (apache-flink)

我想在 java 中编写一个通用的 flink 作业,它可以接受任何 SQL-SELECT 查询,运行 它针对 SQL -数据库并将其写入 Elasticsearch 索引。

我必须解决的问题之一是为 JDBC-Connection 创建数据源。我想使用 JDBCInputFormat. I followed the example in the documentation data source.

问题是,必须指定通用类型 DataSource 类型。而且我只能使用 Tuple 类型,因为 JDBCInputFormat 通用类型 OUT 扩展了 Tuple。但是我在编译时不知道我将使用哪个Tuple

  1. 我是不是理解有误?
  2. 还有其他jdbc InputFormat我可以用吗?
  3. 有没有办法将 Tuple 指定为通用类型?

我用的是java7和apache-flink 0.10.2

我尝试使用 Tuple25 其中只有字符串,但出现异常。

下面是代码,然后是异常。

DataSource<StringsTuple25> database = flink.createInput(
JDBCInputFormat.buildJDBCInputFormat()//
  .setDrivername(getDatabaseDriverName())//
  .setDBUrl(getDatabaseUrl())//
  .setUsername(getDatabaseUsername())//
  .setPassword(getDatabasePassword())//
  .setQuery(getQuery())//
  .finish(), 
  StringsTuple25.typeInformation()
);

我的StringTuple25class

public class StringsTuple25 extends
  Tuple25<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String> {

  private static final long serialVersionUID = 1L;

  public static TypeInformation<?> typeInformation() {
    TypeInformation<String>[] types = new TypeInformation[25];
    Arrays.fill(types, STRING_TYPE_INFO);
    return new TupleTypeInfo<>(Tuple25.class,types);
  }
}

我得到这个例外:

Caused by: java.io.IOException: Tuple size does not match columncount
  at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.extractTypes(JDBCInputFormat.java:180)
  at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:162)
  at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
  at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
  at java.lang.Thread.run(Thread.java:745)

如错误所示,您使用的 Tuple 类型的属性数必须与您在 SQL 查询中选择的列数相匹配。此外,每个属性的数据类型必须匹配。

例如,如果您 SELECT id, name FROM ...idINTEGER 并且 nameVARCHAR,您将指定使用 DataStream<Tuple2<Integer,String>>(或专门化您自己的 class class MyResultType extends Tuple2<Integer,String>DataStream<MyResultType>) 并提供相应的 TypeInformation.

您也可以使用通用 Tuple 类型。您的流将是 DataStream<Tuple>(未指定属性的数量或类型)。但是,对于 TypeInformation,您 需要知道属性的数量

Tuple t = Tuple.getTupleClass(numberOfAttributes).newInstance();
for(int i = 0; i < numberOfAttributes; i++) {
    t.setField("", i);
}
TypeInformation<Tuple> typeInfo = TypeExtractor.getForObject(t);

因此,您需要根据定义 SQL 查询的给定参数推断所选属性的数量。