使用 Scala 在 Flink 中从 JDBC 源创建数据集

Create DataSet From JDBC Source in Flink using Scala

我正在尝试在 Flink 中使用 Scala 从 JDBC 源创建数据集,所有文档/其他 SO 问题似乎都使用 Java。我在使用泛型类型时遇到了一些问题。

到目前为止我有:

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
                 .setDrivername(driver)
                 .setDBUrl(url)
                 .setUsername(username)
                 .setPassword(password)
                 .setQuery("select col_a,col_b from my_table")
                 .finish()

 env.createInput(inputFormat)

这给出了一个错误:

error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[?0]
                    env.createInput(inputFormat)

我也试过了

var tuple = ("",0)
inputFormat.nextRecord(tuple)

哪个给出了错误:

error: type mismatch;
 found   : (String, Int)
 required: ?0

最后我尝试了:

inputFormat.nextRecord(_)

结果是:

found   : x.type (with underlying type ?0)
 required: ?0

所以问题是如何使用 Scala 在 Flink 中设置 JDBC 连接/我哪里错了?

修复第一个问题:

error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[?0]
                    env.createInput(inputFormat)

您需要添加以下导入语句

import org.apache.flink.api.scala._