aiokafka 中 json 消息的架构

Schema for json message in aiokafka

如何在 aiokafka 中为 json 消息添加架构?没有它,Kafka Connect 将无法工作。

import asyncio
import json
import random

import aiokafka
from faker import Faker


def serializer(value):
    return json.dumps(value).encode()


async def produce():
    fake = Faker(['ru_RU'])
    producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092', value_serializer=serializer)
    await producer.start()

    try:
        while True:
            message = {
                'name': fake.first_name(),
                'surname': fake.last_name(),
                'age': random.randint(20, 30)
            }
            await producer.send("mytopic", message)
    finally:
        await producer.stop()


asyncio.run(produce())

Kafka Connect cannot work without it

可以 如果你有 value.converter.schemas.enable=false。是否需要模式取决于特定的连接器,而不是连接本身。


如果您无法更改您的序列化程序,例如,更改为 confluent_kafka 库中的 Avro 序列化程序,那么您需要提供 schemapayload 作为您自己的每条消息

例如

message = { 'schema': { ... }, 
            'payload': {
              'name': fake.first_name(),
              'surname': fake.last_name(),
              'age': random.randint(20, 30)
            }
        }

更多信息:Kafka Connect Deep Dive – Converters and Serialization Explained