如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其相关的 kafka 密钥?
How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benthos?
我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,这些消息的 kafka_key
元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效载荷的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode
processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content
containing the decoded AVRO message and the other one called metadata
containing the various metadata fields 由 Benthos 收集,包括解码的 kafka_key
有效载荷。
事实证明,可以使用 branch
处理器实现这一目标,如下所示:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,这些消息的 kafka_key
元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效载荷的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode
processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content
containing the decoded AVRO message and the other one called metadata
containing the various metadata fields 由 Benthos 收集,包括解码的 kafka_key
有效载荷。
事实证明,可以使用 branch
处理器实现这一目标,如下所示:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}