使用 python 多处理不会将数据推送到 Kafka 队列
Data doesn't get pushed to Kafka Queue using python multiprocessing
我正在使用 Python (2.7) 多处理使用 kafka-python (1.3.5) KafkaProducer 将数据推送到 Kafka 队列。
from kafka import KafkaProducer
import multiprocessing
# other imports
class TestClass(object):
def __init__(self, producer):
self.kafka_producer = producer
def main(self, conf, nthreads):
try:
for i in range(nthreads):
logger.info("Starting process number = %d " % (i + 1))
p = Process(target=self.do_some_task, args=(conf, 2))
p.start()
processes.append(p)
for p in processes:
logger.info("Joining process")
p.join()
except Exception, ex:
logger.error("Exception occurred : %s" % str(ex))
def do_some_task(conf, retry):
# some task happening
self.record(arg1, arg2)
# pushing to kafka
def record(self, arg1, arg2)
message = json.dumps({"a": "arg1", "b": "arg2"})
self.kafka_producer.send(KAFKA_TOPIC, message)
if __name__ == '__main__':
kafka_producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
request_timeout_ms=60000,
retries=2)
obj = TestClass(kafka_producer)
try:
parser = argparse.ArgumentParser(description='Description')
parser.add_argument('-threads', type=int, default=1) # 20 threads
parser.add_argument('-debug', type=int, default=0)
args = parser.parse_args()
me = SingleInstance(args.src)
TestClass.main(CONF[args.src], args.threads)
生成了 20 个写入 kafka 的线程。我看了日志,发现进程等待消息写入kafka,但最终它继续前进,没有写入Kafka。没有例外。
我尝试了 运行 没有来自 python 命令行线程的相同代码,一切都按预期工作。可能是什么问题。
请在分叉进程后产生与 kafka 的连接。并请关闭连接,遇到连接相关错误时重新连接。
我正在使用 Python (2.7) 多处理使用 kafka-python (1.3.5) KafkaProducer 将数据推送到 Kafka 队列。
from kafka import KafkaProducer
import multiprocessing
# other imports
class TestClass(object):
def __init__(self, producer):
self.kafka_producer = producer
def main(self, conf, nthreads):
try:
for i in range(nthreads):
logger.info("Starting process number = %d " % (i + 1))
p = Process(target=self.do_some_task, args=(conf, 2))
p.start()
processes.append(p)
for p in processes:
logger.info("Joining process")
p.join()
except Exception, ex:
logger.error("Exception occurred : %s" % str(ex))
def do_some_task(conf, retry):
# some task happening
self.record(arg1, arg2)
# pushing to kafka
def record(self, arg1, arg2)
message = json.dumps({"a": "arg1", "b": "arg2"})
self.kafka_producer.send(KAFKA_TOPIC, message)
if __name__ == '__main__':
kafka_producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
request_timeout_ms=60000,
retries=2)
obj = TestClass(kafka_producer)
try:
parser = argparse.ArgumentParser(description='Description')
parser.add_argument('-threads', type=int, default=1) # 20 threads
parser.add_argument('-debug', type=int, default=0)
args = parser.parse_args()
me = SingleInstance(args.src)
TestClass.main(CONF[args.src], args.threads)
生成了 20 个写入 kafka 的线程。我看了日志,发现进程等待消息写入kafka,但最终它继续前进,没有写入Kafka。没有例外。
我尝试了 运行 没有来自 python 命令行线程的相同代码,一切都按预期工作。可能是什么问题。
请在分叉进程后产生与 kafka 的连接。并请关闭连接,遇到连接相关错误时重新连接。