Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record

Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record

我正在尝试将数据从 kafka 提取到 aerospike 中。我在发送的 kafka 消息中缺少什么?

我正在将以下数据发送到 kafka 以推送到 aerospike:

ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}

Kafka connect 出现以下错误:

com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException:记录中缺少用户密钥

[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put.invokeSuspend(AerospikeSinkTask.kt:220)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)

aerospike-kafka-inbound.yml 文件:

GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml

# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connect/streaming-to-asdb/from-kafka-to-asdb-overview.html
# for details.

# Map of Kafka topic name to its configuration.
topics:
  phone: # Kafka topic name.
    invalid-record: ignore # not Kill task on invalid record.
    mapping: # Config to convert Kafka record to Aerospike record.
      namespace: # Aerospike record namespace config.
        mode: static
        value: test
      set: # Aerospike record set config.
        mode: static
        value: t1
      key-field: # Aerospike record key config.
        source: key  # Use Kafka record key as the Aerospike record key.
      bins: # Aerospike record bins config.
        type: multi-bins
        # all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
        map:
          name:
            source: value-field
            field-name: firstName
    # The Aerospike cluster connection properties.
aerospike:
  seeds:
    - 127.0.0.1:
        port: 3000

您在发送 kafka 消息时似乎没有指定密钥。默认情况下,Kafka 发送一个空密钥,您的配置说使用 kafka 密钥作为 aerospike 密钥。为了发送 kafka 密钥,您需要将 parse.key 设置为 true 并指定您的分隔符(在 kafka 生产者中)。

请参阅此处的第 8 步

https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html

kafka-console-producer \
  --topic orders \
  --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

这两个属性告诉 kafka 生产者期望消息中有一个键和一个分隔符来区分键和值。

在此示例中,有两条记录,一条带有键 foo,另一条带有 fun

foo:bar
fun:programming

这将导致这两条记录被写入 aerospike,其主键与 kafka 键 foofun.

匹配