Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
我正在尝试将数据从 kafka 提取到 aerospike 中。我在发送的 kafka 消息中缺少什么?
我正在将以下数据发送到 kafka 以推送到 aerospike:
ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}
Kafka connect 出现以下错误:
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥
[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put.invokeSuspend(AerospikeSinkTask.kt:220)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)
aerospike-kafka-inbound.yml 文件:
GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml
# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connect/streaming-to-asdb/from-kafka-to-asdb-overview.html
# for details.
# Map of Kafka topic name to its configuration.
topics:
phone: # Kafka topic name.
invalid-record: ignore # not Kill task on invalid record.
mapping: # Config to convert Kafka record to Aerospike record.
namespace: # Aerospike record namespace config.
mode: static
value: test
set: # Aerospike record set config.
mode: static
value: t1
key-field: # Aerospike record key config.
source: key # Use Kafka record key as the Aerospike record key.
bins: # Aerospike record bins config.
type: multi-bins
# all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
map:
name:
source: value-field
field-name: firstName
# The Aerospike cluster connection properties.
aerospike:
seeds:
- 127.0.0.1:
port: 3000
您在发送 kafka 消息时似乎没有指定密钥。默认情况下,Kafka 发送一个空密钥,您的配置说使用 kafka 密钥作为 aerospike 密钥。为了发送 kafka 密钥,您需要将 parse.key 设置为 true 并指定您的分隔符(在 kafka 生产者中)。
请参阅此处的第 8 步
https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html
kafka-console-producer \
--topic orders \
--bootstrap-server broker:9092 \
--property parse.key=true \
--property key.separator=":"
这两个属性告诉 kafka 生产者期望消息中有一个键和一个分隔符来区分键和值。
在此示例中,有两条记录,一条带有键 foo
,另一条带有 fun
。
foo:bar
fun:programming
这将导致这两条记录被写入 aerospike,其主键与 kafka 键 foo
和 fun
.
匹配
我正在尝试将数据从 kafka 提取到 aerospike 中。我在发送的 kafka 消息中缺少什么?
我正在将以下数据发送到 kafka 以推送到 aerospike:
ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}
Kafka connect 出现以下错误:
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥
[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put.invokeSuspend(AerospikeSinkTask.kt:220)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)
aerospike-kafka-inbound.yml 文件:
GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml
# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connect/streaming-to-asdb/from-kafka-to-asdb-overview.html
# for details.
# Map of Kafka topic name to its configuration.
topics:
phone: # Kafka topic name.
invalid-record: ignore # not Kill task on invalid record.
mapping: # Config to convert Kafka record to Aerospike record.
namespace: # Aerospike record namespace config.
mode: static
value: test
set: # Aerospike record set config.
mode: static
value: t1
key-field: # Aerospike record key config.
source: key # Use Kafka record key as the Aerospike record key.
bins: # Aerospike record bins config.
type: multi-bins
# all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
map:
name:
source: value-field
field-name: firstName
# The Aerospike cluster connection properties.
aerospike:
seeds:
- 127.0.0.1:
port: 3000
您在发送 kafka 消息时似乎没有指定密钥。默认情况下,Kafka 发送一个空密钥,您的配置说使用 kafka 密钥作为 aerospike 密钥。为了发送 kafka 密钥,您需要将 parse.key 设置为 true 并指定您的分隔符(在 kafka 生产者中)。
请参阅此处的第 8 步
https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html
kafka-console-producer \
--topic orders \
--bootstrap-server broker:9092 \
--property parse.key=true \
--property key.separator=":"
这两个属性告诉 kafka 生产者期望消息中有一个键和一个分隔符来区分键和值。
在此示例中,有两条记录,一条带有键 foo
,另一条带有 fun
。
foo:bar
fun:programming
这将导致这两条记录被写入 aerospike,其主键与 kafka 键 foo
和 fun
.