找不到实现 'org.apache.flink.table.factories.DeserializationFormatFactory' 的标识符 'avro-confluent' 的任何工厂
Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory'
我有一个 Flink 作业 运行 在本地运行良好,但是当我尝试 flink 运行 群集上的作业时失败。尝试通过 'connector' = 'kafka' 从 Kafka 加载数据时发生错误。我正在使用 Flink-Table API 和 confluent-avro 格式从 Kafka 读取数据。
所以基本上我创建了一个 table 从 kafka 主题读取数据:
val inputTableSQL =
s"""CREATE TABLE input_table (
| -- key of the topic
| key BYTES NOT NULL,
|
| -- a few columns mapped to the Avro fields of the Kafka value
| id STRING,
|
|) WITH (
|
| 'connector' = 'kafka',
| 'topic' = '${KafkaConfiguration.InputTopicName}',
| 'scan.startup.mode' = 'latest-offset',
|
| -- UTF-8 string as Kafka keys, using the 'key' table column
| 'key.format' = 'raw',
| 'key.fields' = 'key',
|
| 'value.format' = 'avro-confluent',
| 'value.avro-confluent.schema-registry.url' = '${KafkaConfiguration.KafkaConsumerSchemaRegistryUrl}',
| 'value.fields-include' = 'EXCEPT_KEY'
|)
|""".stripMargin
val inputTable = tableEnv.executeSql(inputTableSQL)
然后我创建了另一个 table,我将其用作输出 table:
val outputTableSQL =
s"""CREATE TABLE custom_avro_output_table (
| -- key of the topic
| key BYTES NOT NULL,
|
| -- a few columns mapped to the Avro fields of the Kafka value
| ID STRING
|) WITH (
|
| 'connector' = 'kafka',
| 'topic' = '${KafkaConfiguration.OutputTopicName}',
| 'properties.bootstrap.servers' = '${KafkaConfiguration.KafkaProducerBootstrapServers}',
|
| -- UTF-8 string as Kafka keys, using the 'key' table column
| 'key.format' = 'raw',
| 'key.fields' = 'key',
|
| $outputFormatSettings
| 'value.fields-include' = 'EXCEPT_KEY'
|)
|""".stripMargin
val outputTableCreationResult = tableEnv.executeSql(outputTableSQL)
val customInsertSQL =
"""INSERT INTO custom_avro_output_table
|SELECT key, id
| FROM input_table
| WHERE userAgent LIKE '%ost%'
|""".stripMargin
val customInsertResult = tableEnv.executeSql(customInsertSQL)
当我 运行 在本地机器上运行时,一切正常,但是当我 运行 在集群中运行时,它崩溃了。
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 13 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
canal-json
csv
debezium-json
json
maxwell-json
raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:649) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:633) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.lambda$getValueDecodingFormat(KafkaDynamicTableFactory.java:279) ~[?:?]
at java.util.Optional.orElseGet(Optional.java:267) ~[?:1.8.0_282]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:277) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:142) ~[?:?]
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table_2.12-1.13.1.jar:1.13.1]
以下是我的 build.sbt:
val flinkVersion = "1.13.1"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion % Provided,
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % Provided,
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % Provided,
"org.apache.flink" % "flink-table-common" % flinkVersion % Provided,
"org.apache.flink" % "flink-avro-confluent-registry" % flinkVersion,
"org.apache.flink" % "flink-json" % flinkVersion,
"com.webtrekk" % "wd.generated" % "2.2.3",
"com.webtrekk" % "wd.generated.public" % "2.2.0",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
中已发布类似问题
但是添加提供的解决方案在我的情况下不起作用。
我能够使用以下方法解决此问题:
在我的 build.sbt 中,有以下 mergeStrategy:
lazy val mergeStrategy = Seq(
assembly / assemblyMergeStrategy := {
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\.sf$") => MergeStrategy.discard
case _ => MergeStrategy.first
}
)
我在其中附加了以下块,因此解决了我的异常:
case "META-INF/services/org.apache.flink.table.factories.Factory" => MergeStrategy.concat
case "META-INF/services/org.apache.flink.table.factories.TableFactory" => MergeStrategy.concat
我有一个 Flink 作业 运行 在本地运行良好,但是当我尝试 flink 运行 群集上的作业时失败。尝试通过 'connector' = 'kafka' 从 Kafka 加载数据时发生错误。我正在使用 Flink-Table API 和 confluent-avro 格式从 Kafka 读取数据。
所以基本上我创建了一个 table 从 kafka 主题读取数据:
val inputTableSQL =
s"""CREATE TABLE input_table (
| -- key of the topic
| key BYTES NOT NULL,
|
| -- a few columns mapped to the Avro fields of the Kafka value
| id STRING,
|
|) WITH (
|
| 'connector' = 'kafka',
| 'topic' = '${KafkaConfiguration.InputTopicName}',
| 'scan.startup.mode' = 'latest-offset',
|
| -- UTF-8 string as Kafka keys, using the 'key' table column
| 'key.format' = 'raw',
| 'key.fields' = 'key',
|
| 'value.format' = 'avro-confluent',
| 'value.avro-confluent.schema-registry.url' = '${KafkaConfiguration.KafkaConsumerSchemaRegistryUrl}',
| 'value.fields-include' = 'EXCEPT_KEY'
|)
|""".stripMargin
val inputTable = tableEnv.executeSql(inputTableSQL)
然后我创建了另一个 table,我将其用作输出 table:
val outputTableSQL =
s"""CREATE TABLE custom_avro_output_table (
| -- key of the topic
| key BYTES NOT NULL,
|
| -- a few columns mapped to the Avro fields of the Kafka value
| ID STRING
|) WITH (
|
| 'connector' = 'kafka',
| 'topic' = '${KafkaConfiguration.OutputTopicName}',
| 'properties.bootstrap.servers' = '${KafkaConfiguration.KafkaProducerBootstrapServers}',
|
| -- UTF-8 string as Kafka keys, using the 'key' table column
| 'key.format' = 'raw',
| 'key.fields' = 'key',
|
| $outputFormatSettings
| 'value.fields-include' = 'EXCEPT_KEY'
|)
|""".stripMargin
val outputTableCreationResult = tableEnv.executeSql(outputTableSQL)
val customInsertSQL =
"""INSERT INTO custom_avro_output_table
|SELECT key, id
| FROM input_table
| WHERE userAgent LIKE '%ost%'
|""".stripMargin
val customInsertResult = tableEnv.executeSql(customInsertSQL)
当我 运行 在本地机器上运行时,一切正常,但是当我 运行 在集群中运行时,它崩溃了。
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 13 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
canal-json
csv
debezium-json
json
maxwell-json
raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:649) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:633) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.lambda$getValueDecodingFormat(KafkaDynamicTableFactory.java:279) ~[?:?]
at java.util.Optional.orElseGet(Optional.java:267) ~[?:1.8.0_282]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:277) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:142) ~[?:?]
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ~[flink-table_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table_2.12-1.13.1.jar:1.13.1]
以下是我的 build.sbt:
val flinkVersion = "1.13.1"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion % Provided,
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % Provided,
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % Provided,
"org.apache.flink" % "flink-table-common" % flinkVersion % Provided,
"org.apache.flink" % "flink-avro-confluent-registry" % flinkVersion,
"org.apache.flink" % "flink-json" % flinkVersion,
"com.webtrekk" % "wd.generated" % "2.2.3",
"com.webtrekk" % "wd.generated.public" % "2.2.0",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
我能够使用以下方法解决此问题:
在我的 build.sbt 中,有以下 mergeStrategy:
lazy val mergeStrategy = Seq(
assembly / assemblyMergeStrategy := {
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\.sf$") => MergeStrategy.discard
case _ => MergeStrategy.first
}
)
我在其中附加了以下块,因此解决了我的异常:
case "META-INF/services/org.apache.flink.table.factories.Factory" => MergeStrategy.concat
case "META-INF/services/org.apache.flink.table.factories.TableFactory" => MergeStrategy.concat