JSON column as Key in kafka producer

JSON column as Key in kafka producer

正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。 我有一个生产者,我在其中发送 JSON 格式的数据。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF 
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF

现在我想在发送数据时使用 "countrycode" 作为我的密钥。 在普通分隔数据中,我们可以指定 2 个参数:

--property "parse.key=true" 
--property "key.separator=:

但是发送JSON sata 时如何操作。

我正在为 Kafka 使用 confluent 的 python API 如果有任何我必须写的关于函数分类的东西来实现这个,如果你能说出来我将不胜感激根据 python.

JSON 只是一个字符串。控制台生产者不解析 JSON,只有 Avro 控制台生产者这样做。

我会避免使用 key.separator=:,因为 JSON 包含 :。您可以改用 | 字符,然后您只需输入

countrycode|{"your":"data"}

在Python、the produce function takes a key, yes。您可以像这样解析您的数据,以便为键提取一个值。

key = 'countrycode'
records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
           {"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
]

import json
for r in records:
    producer.produce('topic', key=r[key], value=json.dumps(r))
    # first record will send a record containing ('IN', {  ... 'countrycode':'IN'})