Confluent BigTable 接收器连接器上的 RowKey 定义错误
Error With RowKey Definition on Confluent BigTable Sink Connector
我正在尝试使用 Confluent 的 BigTable Sink 连接器从 kafka 读取数据并将其写入我的 BigTable 实例,但我收到以下消息错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with RowKey definition: Row key definition was defined, but received, deserialized kafka key is not a struct. Unable to construct a row key.
at io.confluent.connect.bigtable.client.RowKeyExtractor.getRowKey(RowKeyExtractor.java:69)
at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:84)
at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:47)
at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
由于某些技术限制,消息生成器将无法生成具有密钥 属性 的消息,因此,我正在使用一些转换从有效负载和设置中获取信息它作为关键信息。
这是我的连接器负载:
{
"name" : "DATALAKE.BIGTABLE.SINK.QUEUEING.ZTXXD",
"config" : {
"connector.class" : "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"topics" : "APP-DATALAKE-QUEUEING-ZTXXD_DATALAKE-V1",
"transforms" : "HoistField,AddKeys,ExtractKey",
"gcp.bigtable.project.id" : "bigtable-project-id",
"gcp.bigtable.instance.id" : "bigtable-instance-id",
"gcp.bigtable.credentials.json" : "XXXXX",
"transforms.ExtractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.HoistField.field" : "raw_data_cf",
"transforms.ExtractKey.field" : "KEY1,ATT1",
"transforms.HoistField.type" : "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.AddKeys.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.AddKeys.fields" : "KEY1,ATT1",
"row.key.definition" : "KEY1,ATT1",
"table.name.format" : "raw_ZTXXD_DATALAKE",
"consumer.override.group.id" : "svc-datalake-KAFKA_2_BIGTABLE",
"confluent.topic.bootstrap.servers" : "xxxxxx:9092",
"input.data.format" : "JSON",
"confluent.topic" : "_dsp-confluent-license",
"input.key.format" : "STRING",
"key.converter.schemas.enable" : "false",
"confluent.topic.security.protocol" : "SASL_SSL",
"row.key.delimiter" : "/",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXXXX\" password=\"XXXXXX\";",
"value.converter.schemas.enable" : "false",
"auto.create.tables" : "true",
"auto.create.column.families" : "true",
"confluent.topic.sasl.mechanism" : "PLAIN"
}
}
这是我发送给 Kafka 的消息:
{
"MANDT": "110",
"KEY1": "1",
"KEY2": null,
"ATT1": "1M",
"ATT2": "0000000000",
"TABLE_NAME": "ZTXXD_DATALAKE",
"IUUC_OPERATION": "I",
"CREATETIMESTAMP": "2022-01-24T20:26:45.247Z"
}
在我的转换中,我进行了三个操作:
HoistField 将我的有效载荷放入一个两级结构中(BigTable 的连接文档说连接需要一个两级结构以便能够推断出系列列
addKey 正在将我认为是关键的列添加到消息键中
ExtractKey是从header中添加的字段中移除key,只留下values本身。
我一直在阅读此 Bigtable 连接器的文档,我不清楚该连接器是否适用于 JSON 格式。可以告诉我吗?
JSON 应该可以,但是...
deserialized kafka key is not a struct
这是因为你在值转换器上设置了schemas.enable=false
属性,所以当你做ValueToKey时,它不是一个Connect Struct类型; HoistField 制作了一个 Java 地图。
如果您无法使用架构注册表并切换序列化格式,那么您将需要尝试找到一种方法让 REST 代理推断 JSON 消息的架构在它产生数据之前(我认为它不能)。否则,您的记录需要包含 schema
和 payload
字段,并且您需要在转换器上启用架构。 Explained here
另一个选项 - 可能有一个设置记录架构的转换项目,但它不是内置的..(它不是 SetSchemaMetadata
的一部分)
我正在尝试使用 Confluent 的 BigTable Sink 连接器从 kafka 读取数据并将其写入我的 BigTable 实例,但我收到以下消息错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with RowKey definition: Row key definition was defined, but received, deserialized kafka key is not a struct. Unable to construct a row key.
at io.confluent.connect.bigtable.client.RowKeyExtractor.getRowKey(RowKeyExtractor.java:69)
at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:84)
at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:47)
at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
由于某些技术限制,消息生成器将无法生成具有密钥 属性 的消息,因此,我正在使用一些转换从有效负载和设置中获取信息它作为关键信息。
这是我的连接器负载:
{
"name" : "DATALAKE.BIGTABLE.SINK.QUEUEING.ZTXXD",
"config" : {
"connector.class" : "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"topics" : "APP-DATALAKE-QUEUEING-ZTXXD_DATALAKE-V1",
"transforms" : "HoistField,AddKeys,ExtractKey",
"gcp.bigtable.project.id" : "bigtable-project-id",
"gcp.bigtable.instance.id" : "bigtable-instance-id",
"gcp.bigtable.credentials.json" : "XXXXX",
"transforms.ExtractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.HoistField.field" : "raw_data_cf",
"transforms.ExtractKey.field" : "KEY1,ATT1",
"transforms.HoistField.type" : "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.AddKeys.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.AddKeys.fields" : "KEY1,ATT1",
"row.key.definition" : "KEY1,ATT1",
"table.name.format" : "raw_ZTXXD_DATALAKE",
"consumer.override.group.id" : "svc-datalake-KAFKA_2_BIGTABLE",
"confluent.topic.bootstrap.servers" : "xxxxxx:9092",
"input.data.format" : "JSON",
"confluent.topic" : "_dsp-confluent-license",
"input.key.format" : "STRING",
"key.converter.schemas.enable" : "false",
"confluent.topic.security.protocol" : "SASL_SSL",
"row.key.delimiter" : "/",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXXXX\" password=\"XXXXXX\";",
"value.converter.schemas.enable" : "false",
"auto.create.tables" : "true",
"auto.create.column.families" : "true",
"confluent.topic.sasl.mechanism" : "PLAIN"
}
}
这是我发送给 Kafka 的消息:
{
"MANDT": "110",
"KEY1": "1",
"KEY2": null,
"ATT1": "1M",
"ATT2": "0000000000",
"TABLE_NAME": "ZTXXD_DATALAKE",
"IUUC_OPERATION": "I",
"CREATETIMESTAMP": "2022-01-24T20:26:45.247Z"
}
在我的转换中,我进行了三个操作:
HoistField 将我的有效载荷放入一个两级结构中(BigTable 的连接文档说连接需要一个两级结构以便能够推断出系列列
addKey 正在将我认为是关键的列添加到消息键中
ExtractKey是从header中添加的字段中移除key,只留下values本身。
我一直在阅读此 Bigtable 连接器的文档,我不清楚该连接器是否适用于 JSON 格式。可以告诉我吗?
JSON 应该可以,但是...
deserialized kafka key is not a struct
这是因为你在值转换器上设置了schemas.enable=false
属性,所以当你做ValueToKey时,它不是一个Connect Struct类型; HoistField 制作了一个 Java 地图。
如果您无法使用架构注册表并切换序列化格式,那么您将需要尝试找到一种方法让 REST 代理推断 JSON 消息的架构在它产生数据之前(我认为它不能)。否则,您的记录需要包含 schema
和 payload
字段,并且您需要在转换器上启用架构。 Explained here
另一个选项 - 可能有一个设置记录架构的转换项目,但它不是内置的..(它不是 SetSchemaMetadata
的一部分)