aiokafka 中 json 消息的架构
Schema for json message in aiokafka
如何在 aiokafka
中为 json 消息添加架构?没有它,Kafka Connect 将无法工作。
import asyncio
import json
import random
import aiokafka
from faker import Faker
def serializer(value):
return json.dumps(value).encode()
async def produce():
fake = Faker(['ru_RU'])
producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092', value_serializer=serializer)
await producer.start()
try:
while True:
message = {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
await producer.send("mytopic", message)
finally:
await producer.stop()
asyncio.run(produce())
Kafka Connect cannot work without it
它 可以 如果你有 value.converter.schemas.enable=false
。是否需要模式取决于特定的连接器,而不是连接本身。
如果您无法更改您的序列化程序,例如,更改为 confluent_kafka
库中的 Avro 序列化程序,那么您需要提供 schema
和 payload
作为您自己的每条消息
例如
message = { 'schema': { ... },
'payload': {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
}
更多信息:Kafka Connect Deep Dive – Converters and Serialization Explained
如何在 aiokafka
中为 json 消息添加架构?没有它,Kafka Connect 将无法工作。
import asyncio
import json
import random
import aiokafka
from faker import Faker
def serializer(value):
return json.dumps(value).encode()
async def produce():
fake = Faker(['ru_RU'])
producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092', value_serializer=serializer)
await producer.start()
try:
while True:
message = {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
await producer.send("mytopic", message)
finally:
await producer.stop()
asyncio.run(produce())
Kafka Connect cannot work without it
它 可以 如果你有 value.converter.schemas.enable=false
。是否需要模式取决于特定的连接器,而不是连接本身。
如果您无法更改您的序列化程序,例如,更改为 confluent_kafka
库中的 Avro 序列化程序,那么您需要提供 schema
和 payload
作为您自己的每条消息
例如
message = { 'schema': { ... },
'payload': {
'name': fake.first_name(),
'surname': fake.last_name(),
'age': random.randint(20, 30)
}
}
更多信息:Kafka Connect Deep Dive – Converters and Serialization Explained