为什么我的 Kafka 消费者比我的 Kafka 生产者慢得多?
Why is my Kafka consumer much slower than my kafka producer?
我得到了我能够完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果,或者我的代码有什么问题?
这是我在 python 中的 Kafka 消费者代码:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
编辑:
我也添加了我的个人资料结果,代码行慢似乎是
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
感谢您的回复。
您可以在不保存到 C* 的情况下尝试 运行 消费者,这样您可以观察它有多大的不同。
如果事实证明保存到 C* 是一个瓶颈(我假设是),您可以拥有一个线程池(大于 16 个线程,您的消费者产生),其唯一责任是写入 C*。
这样,您将卸载代码中速度较慢的部分,这将只在消费者代码中留下微不足道的部分。
您可以使用 from multiprocessing import Pool
.
更多 here.
我得到了我能够完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果,或者我的代码有什么问题?
这是我在 python 中的 Kafka 消费者代码:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
编辑:
我也添加了我的个人资料结果,代码行慢似乎是
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
感谢您的回复。
您可以在不保存到 C* 的情况下尝试 运行 消费者,这样您可以观察它有多大的不同。
如果事实证明保存到 C* 是一个瓶颈(我假设是),您可以拥有一个线程池(大于 16 个线程,您的消费者产生),其唯一责任是写入 C*。
这样,您将卸载代码中速度较慢的部分,这将只在消费者代码中留下微不足道的部分。
您可以使用 from multiprocessing import Pool
.
更多 here.