如何使用 python 在 kafka 消费者中聚合 json 数据?
How to aggregate json data in kafka consumer using python?
我在 KAFKA Transactions 主题中生成的数据如下:
ConsumerRecord(topic='Transactions', partition=0, offset=3, timestamp=1591277946735, timestamp_type=0, key=None, value={'transaction_id': '9495601361', 'account_number': 14, 'transaction_reference': '20070', 'transaction_datetime': '2020-06-04T19:09:06.735129', 'amount': 260.93} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)
ConsumerRecord(topic='Transactions', partition=0, offset=4, timestamp=1591277946736, timestamp_type=0, key=None, value={'transaction_id': '4952940859', 'account_number': 14, 'transaction_reference': '44291', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 2.82} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=158, serialized_header_size=-1)
ConsumerRecord(topic='Transactions', partition=0, offset=5, timestamp=1591277946737, timestamp_type=0, key=None, value={'transaction_id': '0193362270', 'account_number': 12, 'transaction_reference': '96312', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 766.95} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)
目前编写的消费者代码为:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
print (message)
我想要像 (account_number, sum(amount)) 这样的元组输出,我该如何实现?
我认为字典对分组数据可能比元组更有用。 defaultdict
很适合这个过程
from collections import defaultdict
accounts = defaultdict(int)
for message in consumer:
payload = message.value
account = payload['account_number']
amount = payload['amount']
accounts[account] += amount
print(accounts)
defaultdict(<class 'int'>,{
"14": 263.75,
"12": 766.95
})
要获取您可能正在寻找的元组,您可以在循环后迭代 accounts.items()
:
for info in accounts.items():
print(info)
("14", 263.75)
("12", 766.95)
我在 KAFKA Transactions 主题中生成的数据如下:
ConsumerRecord(topic='Transactions', partition=0, offset=3, timestamp=1591277946735, timestamp_type=0, key=None, value={'transaction_id': '9495601361', 'account_number': 14, 'transaction_reference': '20070', 'transaction_datetime': '2020-06-04T19:09:06.735129', 'amount': 260.93} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)
ConsumerRecord(topic='Transactions', partition=0, offset=4, timestamp=1591277946736, timestamp_type=0, key=None, value={'transaction_id': '4952940859', 'account_number': 14, 'transaction_reference': '44291', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 2.82} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=158, serialized_header_size=-1)
ConsumerRecord(topic='Transactions', partition=0, offset=5, timestamp=1591277946737, timestamp_type=0, key=None, value={'transaction_id': '0193362270', 'account_number': 12, 'transaction_reference': '96312', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 766.95} , headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)
目前编写的消费者代码为:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
print (message)
我想要像 (account_number, sum(amount)) 这样的元组输出,我该如何实现?
我认为字典对分组数据可能比元组更有用。 defaultdict
很适合这个过程
from collections import defaultdict
accounts = defaultdict(int)
for message in consumer:
payload = message.value
account = payload['account_number']
amount = payload['amount']
accounts[account] += amount
print(accounts)
defaultdict(<class 'int'>,{
"14": 263.75,
"12": 766.95
})
要获取您可能正在寻找的元组,您可以在循环后迭代 accounts.items()
:
for info in accounts.items():
print(info)
("14", 263.75)
("12", 766.95)