Flink:找不到在类路径中实现 'org.apache.flink.table.factories.CatalogFactory' 的标识符 'kafka' 的任何工厂
Flink: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath
我正在尝试通过 sql-client.sh
将 Kafka 连接到 Flink 和 运行。但是,无论我用 .yaml
和库做什么,我都会不断收到错误消息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.
Catalog options are:
'type'='kafka'
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs(LegacyTableEnvironmentInitializer.java:120)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.
Available factory identifiers are:
generic_in_memory
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
... 11 more
我的 sql-conf 非常简单(我没有包含 bootstrap 服务器等敏感信息):
catalogs:
- name: myKafka
type: kafka
此外,library
文件夹包含以下 jars:
flink-avro-confluent-registry-1.13.2.jar
flink-connector-kafka_2.12-1.13.2.jar
flink-sql-connector-kafka_2.12-1.13.2.jar
kafka-clients-2.0.0-cdh6.1.1.jar
Flink 版本:1.13.2
。 Kafka 版本:2.0.0-cdh6.1.1
.
解决方案(感谢@Niko 为我指明了正确的方向):
我修改了 sql-conf.yaml
以使用 hive
目录并在 SQL 中创建了 Kafka table。所以,我的 sql-conf.yaml
看起来像:
execution:
type: streaming
result-mode: table
planner: blink
current-database: default
current-catalog: myhive
catalogs:
- name: myhive
type: hive
hive-version: 2.1.1-cdh6.0.1
hive-conf-dir: /etc/hive/conf
deployment:
m: yarn-cluster
yqu: ABC_XYZ
运行 它和 SQL-client.sh 内部,使用必要的连接创建了 Kafka table。
所有使用 YAML 定义的目录都必须提供一个类型 属性 来指定目录的类型。开箱即用地支持以下类型:
- generic_in_memory
- 蜂巢
您可以在 official doc
中了解更多信息
您可以创建所谓的 initialization SQL file,例如:
CREATE CATALOG MyCatalog WITH (
'type' = 'hive',
'default-database' = 'my_database',
'hive-conf-dir' = '/etc/hive/conf'
);
USE CATALOG MyCatalog;
CREATE TABLE MyTable(
MyField1 INT,
MyField2 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'YOUR_TOPIC',
'properties.bootstrap.servers' = 'localhost',
'properties.group.id' = 'some_id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
我正在尝试通过 sql-client.sh
将 Kafka 连接到 Flink 和 运行。但是,无论我用 .yaml
和库做什么,我都会不断收到错误消息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.
Catalog options are:
'type'='kafka'
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs(LegacyTableEnvironmentInitializer.java:120)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.
Available factory identifiers are:
generic_in_memory
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
... 11 more
我的 sql-conf 非常简单(我没有包含 bootstrap 服务器等敏感信息):
catalogs:
- name: myKafka
type: kafka
此外,library
文件夹包含以下 jars:
flink-avro-confluent-registry-1.13.2.jar
flink-connector-kafka_2.12-1.13.2.jar
flink-sql-connector-kafka_2.12-1.13.2.jar
kafka-clients-2.0.0-cdh6.1.1.jar
Flink 版本:1.13.2
。 Kafka 版本:2.0.0-cdh6.1.1
.
解决方案(感谢@Niko 为我指明了正确的方向):
我修改了 sql-conf.yaml
以使用 hive
目录并在 SQL 中创建了 Kafka table。所以,我的 sql-conf.yaml
看起来像:
execution:
type: streaming
result-mode: table
planner: blink
current-database: default
current-catalog: myhive
catalogs:
- name: myhive
type: hive
hive-version: 2.1.1-cdh6.0.1
hive-conf-dir: /etc/hive/conf
deployment:
m: yarn-cluster
yqu: ABC_XYZ
运行 它和 SQL-client.sh 内部,使用必要的连接创建了 Kafka table。
所有使用 YAML 定义的目录都必须提供一个类型 属性 来指定目录的类型。开箱即用地支持以下类型:
- generic_in_memory
- 蜂巢
您可以在 official doc
中了解更多信息您可以创建所谓的 initialization SQL file,例如:
CREATE CATALOG MyCatalog WITH (
'type' = 'hive',
'default-database' = 'my_database',
'hive-conf-dir' = '/etc/hive/conf'
);
USE CATALOG MyCatalog;
CREATE TABLE MyTable(
MyField1 INT,
MyField2 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'YOUR_TOPIC',
'properties.bootstrap.servers' = 'localhost',
'properties.group.id' = 'some_id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)