如何使用 Python 在 Kafka 中生成墓碑 Avro 记录?
How to produce a Tombstone Avro Record in Kafka using Python?
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['10.0.0.0:9092'],
)
message=producer.send('orders', key=b'{"id":1}', value=None)
但它给出了错误。序列化错误。
我假设您想生成 Avro 消息,因此您需要正确地序列化您的消息。我将使用 confluent-kafka-python
库,所以如果您还没有安装它,只需 运行
pip install confluent-kafka[avro]
下面是一个示例 AvroConsumer
,它发送一条带有空值的 Avro 消息:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"id",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"product",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"quantity",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"price",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"id",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
#value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key)
avroProducer.flush()
您需要在 Avro Schema 中设置才能将 Avro 字段设置为 null
,方法是添加 null
作为字段的一种可能类型。
查看 Avro 文档中的示例:
{
"type": "record",
"name": "yourRecord",
"fields" : [
{"name": "empId", "type": "long"}, // mandatory field
{"name": "empName", "type": ["null", "string"]} // optional field
]
}
此处 empName
在类型中声明为 null 或字符串。这允许将 empName 字段设置为空。
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['10.0.0.0:9092'],
)
message=producer.send('orders', key=b'{"id":1}', value=None)
但它给出了错误。序列化错误。
我假设您想生成 Avro 消息,因此您需要正确地序列化您的消息。我将使用 confluent-kafka-python
库,所以如果您还没有安装它,只需 运行
pip install confluent-kafka[avro]
下面是一个示例 AvroConsumer
,它发送一条带有空值的 Avro 消息:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"type":"record",
"name":"myrecord",
"fields":[
{
"name":"id",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"product",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"quantity",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"price",
"type":[
"null",
"int"
],
"default":null
}
]
}
"""
key_schema_str = """
{
"type":"record",
"name":"key_schema",
"fields":[
{
"name":"id",
"type":"int"
}
]
}
"""
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
#value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
key = {"id": 1}
avroProducer = AvroProducer({
'bootstrap.servers': '10.0.0.0:9092',
'on_delivery': delivery_report,
'schema.registry.url': 'http://10.0.0.0:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='orders', key=key)
avroProducer.flush()
您需要在 Avro Schema 中设置才能将 Avro 字段设置为 null
,方法是添加 null
作为字段的一种可能类型。
查看 Avro 文档中的示例:
{
"type": "record",
"name": "yourRecord",
"fields" : [
{"name": "empId", "type": "long"}, // mandatory field
{"name": "empName", "type": ["null", "string"]} // optional field
]
}
此处 empName
在类型中声明为 null 或字符串。这允许将 empName 字段设置为空。