JSON column as Key in kafka producer
JSON column as Key in kafka producer
正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。
我有一个生产者,我在其中发送 JSON 格式的数据。
kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF
现在我想在发送数据时使用 "countrycode" 作为我的密钥。
在普通分隔数据中,我们可以指定 2 个参数:
--property "parse.key=true"
--property "key.separator=:
但是发送JSON sata 时如何操作。
我正在为 Kafka 使用 confluent 的 python API 如果有任何我必须写的关于函数分类的东西来实现这个,如果你能说出来我将不胜感激根据 python.
JSON 只是一个字符串。控制台生产者不解析 JSON,只有 Avro 控制台生产者这样做。
我会避免使用 key.separator=:
,因为 JSON 包含 :
。您可以改用 |
字符,然后您只需输入
countrycode|{"your":"data"}
在Python、the produce function takes a key, yes。您可以像这样解析您的数据,以便为键提取一个值。
key = 'countrycode'
records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
{"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
]
import json
for r in records:
producer.produce('topic', key=r[key], value=json.dumps(r))
# first record will send a record containing ('IN', { ... 'countrycode':'IN'})
正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。 我有一个生产者,我在其中发送 JSON 格式的数据。
kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF
现在我想在发送数据时使用 "countrycode" 作为我的密钥。 在普通分隔数据中,我们可以指定 2 个参数:
--property "parse.key=true"
--property "key.separator=:
但是发送JSON sata 时如何操作。
我正在为 Kafka 使用 confluent 的 python API 如果有任何我必须写的关于函数分类的东西来实现这个,如果你能说出来我将不胜感激根据 python.
JSON 只是一个字符串。控制台生产者不解析 JSON,只有 Avro 控制台生产者这样做。
我会避免使用 key.separator=:
,因为 JSON 包含 :
。您可以改用 |
字符,然后您只需输入
countrycode|{"your":"data"}
在Python、the produce function takes a key, yes。您可以像这样解析您的数据,以便为键提取一个值。
key = 'countrycode'
records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
{"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
]
import json
for r in records:
producer.produce('topic', key=r[key], value=json.dumps(r))
# first record will send a record containing ('IN', { ... 'countrycode':'IN'})