如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其相关的 kafka 密钥?

How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benthos?

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,这些消息的 kafka_key 元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效载荷的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content containing the decoded AVRO message and the other one called metadata containing the various metadata fields 由 Benthos 收集,包括解码的 kafka_key 有效载荷。

事实证明,可以使用 branch 处理器实现这一目标,如下所示:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
        root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
          root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this

output:
  stdout: {}