如何在 Apache Flink 中创建外部目录 Table
How can I create an External Catalog Table in Apache Flink
我尝试创建并在 Apache Flink 中使用 ExternalCatalog Table。我创建并添加到 Flink table 环境(这里是官方 documentation)。出于某种原因,'catalog' 中唯一存在的外部 table,在扫描期间未找到。我在上面的代码中错过了什么?
val catalogName = s"externalCatalog$fileNumber"
val ec: ExternalCatalog = getExternalCatalog(catalogName, 1, tableEnv)
tableEnv.registerExternalCatalog(catalogName, ec)
val s1: Table = tableEnv.scan("S_EXT")
def getExternalCatalog(catalogName: String, fileNumber: Int, tableEnv: BatchTableEnvironment): ExternalCatalog = {
val cat = new InMemoryExternalCatalog(catalogName)
// external Catalog table
val externalCatalogTableS = getExternalCatalogTable("S")
// add external Catalog table
cat.createTable("S_EXT", externalCatalogTableS, ignoreIfExists = false)
cat
}
private def getExternalCatalogTable(fileName: String): ExternalCatalogTable = {
// connector descriptor
val connectorDescriptor = new FileSystem()
connectorDescriptor.path(getFilePath(fileNumber, fileName))
// format
val fd = new Csv()
fd.field("X", Types.STRING)
fd.field("Y", Types.STRING)
fd.fieldDelimiter(",")
// statistic
val statistics = new Statistics()
statistics.rowCount(0)
// metadata
val md = new Metadata()
ExternalCatalogTable.builder(connectorDescriptor)
.withFormat(fd)
.withStatistics(statistics)
.withMetadata(md)
.asTableSource()
}
上面的例子是test file in git的一部分。
这可能是命名空间问题。外部目录中的表由目录名称列表(可能是模式)标识,最后是 table 名称。
在您的示例中,以下内容应该有效:
val s1: Table = tableEnv.scan("externalCatalog1", "S_EXT")
您可以查看 ExternalCatalogTest 以了解如何使用外部目录。
我尝试创建并在 Apache Flink 中使用 ExternalCatalog Table。我创建并添加到 Flink table 环境(这里是官方 documentation)。出于某种原因,'catalog' 中唯一存在的外部 table,在扫描期间未找到。我在上面的代码中错过了什么?
val catalogName = s"externalCatalog$fileNumber"
val ec: ExternalCatalog = getExternalCatalog(catalogName, 1, tableEnv)
tableEnv.registerExternalCatalog(catalogName, ec)
val s1: Table = tableEnv.scan("S_EXT")
def getExternalCatalog(catalogName: String, fileNumber: Int, tableEnv: BatchTableEnvironment): ExternalCatalog = {
val cat = new InMemoryExternalCatalog(catalogName)
// external Catalog table
val externalCatalogTableS = getExternalCatalogTable("S")
// add external Catalog table
cat.createTable("S_EXT", externalCatalogTableS, ignoreIfExists = false)
cat
}
private def getExternalCatalogTable(fileName: String): ExternalCatalogTable = {
// connector descriptor
val connectorDescriptor = new FileSystem()
connectorDescriptor.path(getFilePath(fileNumber, fileName))
// format
val fd = new Csv()
fd.field("X", Types.STRING)
fd.field("Y", Types.STRING)
fd.fieldDelimiter(",")
// statistic
val statistics = new Statistics()
statistics.rowCount(0)
// metadata
val md = new Metadata()
ExternalCatalogTable.builder(connectorDescriptor)
.withFormat(fd)
.withStatistics(statistics)
.withMetadata(md)
.asTableSource()
}
上面的例子是test file in git的一部分。
这可能是命名空间问题。外部目录中的表由目录名称列表(可能是模式)标识,最后是 table 名称。
在您的示例中,以下内容应该有效:
val s1: Table = tableEnv.scan("externalCatalog1", "S_EXT")
您可以查看 ExternalCatalogTest 以了解如何使用外部目录。