如何使 kafka-python 或 pykafka 与 uwsgi 和 gevent 一起作为异步生产者工作?
How to make kafka-python or pykafka work as an async producer with uwsgi and gevent?
我的 Stack 是带有 gevents 的 uwsgi。我试图用装饰器包装我的 api 端点,以将所有请求数据(url、方法、正文和响应)推送到 kafka 主题,但它不起作用。我的理论是因为我正在使用 gevents,并且我试图在异步模式下 运行 这些,实际推送到 kafka 的异步线程无法 运行 使用 gevents。如果我尝试使方法同步,那么它也不起作用,它在生产工人中死亡,即在生产调用之后永远不会 returns。尽管这两种方法在 python shell 上都很好,如果我在线程上 运行 uwsgi
示例代码如下:
1. 使用 kafka-python(异步)
try:
kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
except NoBrokersAvailable:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
kafka_producer = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not kafka_producer:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
return
data = json.dumps(message)
try:
start = time.time()
kafka_producer.send(topic, key=str(key), value=data)
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except KafkaTimeoutError as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.info(e)
pass
except Exception as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.exception(e)
pass
与 py-kafka(同步):
try:
client = KafkaClient(hosts=KAFKAHOST)
except Exception as e:
logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST))
client = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not client:
logger.info(u'Kafka Host is None')
return
data = json.dumps(message)
try:
start = time.time()
topic = client.topics[topic]
with topic.get_sync_producer() as producer:
producer.produce(data, partition_key='{}'.format(key))
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except Exception as e:
logger.exception(e)
pass
我对 pykafka 有更多的经验,所以我可以回答那个部分。 pykafka 使用可插入的线程处理程序,并且内置了 gevent 支持。您需要使用 use_greenlets=True
实例化 KafkaClient。文档 here
关于您的方法的其他想法。为每条消息创建一个新的主题对象和生产者是非常昂贵的。最好创建一次并重复使用。
# setup once
client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True)
topic = client.topics[topic]
producer = topic.get_sync_producer()
def send_message_to_kafka(producer, key, message):
"""
:param producer: pykafka producer
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
data = json.dumps(message)
try:
start = time.time()
producer.produce(data, partition_key='{}'.format(key))
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except Exception as e:
logger.exception(e)
pass # for at least once delivery you will need to catch network errors and retry.
最后,kafka 的所有速度都来自批处理和压缩。使用同步生产者可以防止客户端利用这些功能。它会工作,但速度较慢并且使用更多 space。一些应用程序需要同步,但如果您遇到性能瓶颈,重新考虑您的应用程序以批处理消息可能是有意义的。
我的 Stack 是带有 gevents 的 uwsgi。我试图用装饰器包装我的 api 端点,以将所有请求数据(url、方法、正文和响应)推送到 kafka 主题,但它不起作用。我的理论是因为我正在使用 gevents,并且我试图在异步模式下 运行 这些,实际推送到 kafka 的异步线程无法 运行 使用 gevents。如果我尝试使方法同步,那么它也不起作用,它在生产工人中死亡,即在生产调用之后永远不会 returns。尽管这两种方法在 python shell 上都很好,如果我在线程上 运行 uwsgi
示例代码如下: 1. 使用 kafka-python(异步)
try:
kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
except NoBrokersAvailable:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
kafka_producer = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not kafka_producer:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
return
data = json.dumps(message)
try:
start = time.time()
kafka_producer.send(topic, key=str(key), value=data)
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except KafkaTimeoutError as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.info(e)
pass
except Exception as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.exception(e)
pass
与 py-kafka(同步):
try: client = KafkaClient(hosts=KAFKAHOST) except Exception as e: logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST)) client = None def send_message_to_kafka(topic, key, message): """ :param topic: topic name :param key: key to decide partition :param message: json serializable object to send :return: """ if not client: logger.info(u'Kafka Host is None') return data = json.dumps(message) try: start = time.time() topic = client.topics[topic] with topic.get_sync_producer() as producer: producer.produce(data, partition_key='{}'.format(key)) logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) except Exception as e: logger.exception(e) pass
我对 pykafka 有更多的经验,所以我可以回答那个部分。 pykafka 使用可插入的线程处理程序,并且内置了 gevent 支持。您需要使用 use_greenlets=True
实例化 KafkaClient。文档 here
关于您的方法的其他想法。为每条消息创建一个新的主题对象和生产者是非常昂贵的。最好创建一次并重复使用。
# setup once
client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True)
topic = client.topics[topic]
producer = topic.get_sync_producer()
def send_message_to_kafka(producer, key, message):
"""
:param producer: pykafka producer
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
data = json.dumps(message)
try:
start = time.time()
producer.produce(data, partition_key='{}'.format(key))
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except Exception as e:
logger.exception(e)
pass # for at least once delivery you will need to catch network errors and retry.
最后,kafka 的所有速度都来自批处理和压缩。使用同步生产者可以防止客户端利用这些功能。它会工作,但速度较慢并且使用更多 space。一些应用程序需要同步,但如果您遇到性能瓶颈,重新考虑您的应用程序以批处理消息可能是有意义的。