Python 性能 - 最佳并行方法
Python performance - best parallelism approach
我正在实施一个 Python 脚本,该脚本需要在每个不到 5 秒的时间内并行发送 1500 多个数据包。
简而言之,我需要的是:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
for x in list[:1500]:
send_pkts(x)
time.sleep(randint(1,5))
我尝试了简单的单线程、多线程、多处理和多处理+多线程形式,但遇到了以下问题:
- 简单的单线程:
“for 延迟”似乎损害了“5 秒”的依赖性。
- 多线程:
由于 Python GIL 限制,我想我无法完成我想要的。
- 多处理:
这似乎是最有效的方法。但是,由于进程过多,我所在的 VM 运行 脚本冻结(当然,1500 个进程 运行)。因此变得不切实际。
- 多处理+多线程:
在这种方法中,我创建了更少的进程,每个进程调用一些线程(假设:10 个进程每个调用 150 个线程)。很明显,VM 的冻结速度不如方法 3,但是我能达到的最多 "concurrent packet sending" 是 ~800。 GIL 限制?虚拟机限制?
在这次尝试中,我也尝试使用进程池,但结果相似。
我可以使用更好的方法来完成这项任务吗?
[1] 编辑 1:
def send_pkt(x):
#craft pkt
while True:
#send pkt
gevent.sleep(0)
gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])
[2] 编辑 2(gevent 猴子修补):
from gevent import monkey; monkey.patch_all()
jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]
但是我收到以下错误:"ValueError: filedescriptor out of range in select()"。所以我检查了我的系统 ulimit(Soft 和 Hard 都是最大值:65536)。
之后,我检查了它与 select() 限制有关 Linux (最大 1024 fds)。请检查:http://man7.org/linux/man-pages/man2/select.2.html (BUGS section) - In orderto overcome that I should use poll() (http://man7.org/linux/man-pages/man2/poll.2.html)。但是使用 poll() 我 return 有相同的限制:因为轮询是 "blocking approach".
此致,
您在选项 3 中的结果:"due to excessive quantity of process the VM where I am running the script freezes (of course, 1500 process running)" 需要进一步调查。我认为,根据目前收集到的信息,可能无法确定这是多处理方法的缺点还是 VM 的局限性更好。
一种相当简单直接的方法是 运行 缩放实验:与其让所有发送都来自单个进程或所有发送都来自同一进程,不如尝试中间值。计算将工作负载在两个进程(或 4、8 等)之间分成两半所需的时间。
在这样做的同时,运行 一个像 Windows 上的 xperf
或 Linux 上的 oprofile
这样的工具也可能是一个好主意,以记录是否这些不同的并行性选择导致了不同类型的瓶颈,例如抖动 CPU 缓存,运行 使 VM 内存不足,或者谁知道还有什么。最简单的方法就是尝试一下。
根据之前处理这些类型问题的经验和一般经验法则,我预计当多处理进程的数量小于或等于可用的 CPU 核心数量时,性能最佳(在 VM 本身或管理程序上)。然而,这是假设问题是 CPU 绑定的;如果在数据包发送过程中发生阻塞,如果与其他阻塞操作交错,可以更好地利用 CPU 时间,那么超过 #cpu 个进程的性能可能仍然会更高。不过,在完成一些分析 and/or 缩放实验之前,我们不知道。
你是正确的 python 是单线程的,但是你想要的任务(发送网络数据包)被认为是 IO 绑定操作,因此是多线程的一个很好的候选者。只要您在编写代码时考虑到异步,您的主线程在传输数据包时并不忙。
查看有关异步 tcp 网络的 python 文档 - https://docs.python.org/3/library/asyncio-protocol.html#tcp-echo-client。
在 Python 中使用并行性时,一个好的方法是使用 ThreadPoolExecutor 或 ProcessPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
根据我的经验,这些效果很好。
可以根据您的使用进行调整的 threadedPoolExecutor 示例。
import concurrent.futures
import urllib.request
import time
IPs= ['168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204']
def send_pkt(x):
status = 'Failed'
while True:
#send pkt
time.sleep(10)
status = 'Successful'
break
return status
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip, exc))
else:
print('%r send %s' % (url, data))
如果瓶颈是基于 http(“发送数据包”),那么 GIL 实际上应该不是什么大问题。
如果 python 中也有计算发生,那么 GIL 可能会妨碍,正如您所说,基于进程的并行性将是首选。
您不需要每个任务一个进程!这似乎是你思想上的疏忽。使用 python 的 Pool
class,您可以轻松创建一组将从队列接收任务的工作人员。
import multiprocessing
def send_pkts(ip):
...
number_of_workers = 8
with multiprocessing.Pool(number_of_workers) as pool:
pool.map(send_pkts, list[:1500])
你现在是运行number_of_workers + 1
个进程(工人+原来的进程),N个工人运行同时send_pkts
函数
阻碍您实现预期性能的主要问题是 send_pkts()
方法。它不仅发送数据包,还制作数据包:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
虽然发送数据包几乎肯定是一项 I/O 绑定任务,但制作数据包几乎肯定是一项 CPU 绑定任务。该方法需要拆分成两个任务:
- 制作一个数据包
- 发送数据包
我已经编写了一个基本的套接字服务器和一个客户端应用程序,用于制作数据包并将其发送到服务器。这个想法是有一个单独的过程来制作数据包并将它们放入队列中。有一个线程池与数据包制作过程共享队列。这些线程将数据包从队列中拉出并将它们发送到服务器。他们还将服务器的响应粘贴到另一个共享队列中,但这只是为了我自己的测试,与您尝试做的事情无关。当线程从队列中获得 None
(poison pill) 时退出。
server.py:
import argparse
import socketserver
import time
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="bind to host")
parser.add_argument("--port", type=int, help="bind to port")
parser.add_argument("--packet-size", type=int, help="size of packets")
args = parser.parse_args()
HOST, PORT = args.host, args.port
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
time.sleep(1.5)
data = self.request.recv(args.packet_size)
self.request.sendall(data.upper())
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
client.py:
import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread
def get_logger():
logger = logging.getLogger("threading_example")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("client.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class PacketMaker(mp.Process):
def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
mp.Process.__init__(self)
self.result_queue = result_queue
self.max_packets = max_packets
self.packet_size = packet_size
self.num_poison_pills = num_poison_pills
self.num_packets_made = 0
self.logger = logger
def run(self):
while True:
if self.num_packets_made >= self.max_packets:
for _ in range(self.num_poison_pills):
self.result_queue.put(None, timeout=1)
self.logger.debug('PacketMaker exiting')
return
self.result_queue.put(os.urandom(self.packet_size), timeout=1)
self.num_packets_made += 1
class PacketSender(Thread):
def __init__(self, task_queue, result_queue, addr, packet_size, logger):
Thread.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.server_addr = addr
self.packet_size = packet_size
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(addr)
self.logger = logger
def run(self):
while True:
packet = self.task_queue.get(timeout=1)
if packet is None:
self.logger.debug("PacketSender exiting")
return
try:
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
except socket.error:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_addr)
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
self.result_queue.put(response)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num-packets', type=int, help='number of packets to send')
parser.add_argument('--packet-size', type=int, help='packet size in bytes')
parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
parser.add_argument('--host', type=str, help='name of host packets will be sent to')
parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
args = parser.parse_args()
logger = get_logger()
logger.info(f"starting script with args {args}")
packets_to_send = mp.Queue(args.num_packets + args.num_threads)
packets_received = q.Queue(args.num_packets)
producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
for _ in range(args.num_threads)]
start_time = time.time()
logger.info("starting workers")
for worker in senders + producers:
worker.start()
for worker in senders:
worker.join()
logger.info("workers finished")
end_time = time.time()
print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")
run.sh:
#!/usr/bin/env bash
for i in "$@"
do
case $i in
-s=*|--packet-size=*)
packet_size="${i#*=}"
shift
;;
-n=*|--num-packets=*)
num_packets="${i#*=}"
shift
;;
-t=*|--num-threads=*)
num_threads="${i#*=}"
shift
;;
-h=*|--host=*)
host="${i#*=}"
shift
;;
-p=*|--port=*)
port="${i#*=}"
shift
;;
*)
;;
esac
done
python3 server.py --host="${host}" \
--port="${port}" \
--packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
--num-packets="${num_packets}" \
--num-threads="${num_threads}" \
--host="${host}" \
--port="${port}"
kill "${server_pid}"
$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999
1500 packets received in 4.70330023765564 seconds
$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999
1500 packets received in 1.5025699138641357 seconds
可以通过将 client.py 中的日志级别更改为 DEBUG
来验证此结果。请注意,该脚本确实需要 4.7 秒以上的时间才能完成。使用 300 个线程时需要进行相当多的拆卸,但日志清楚地表明线程在 4.7 秒时完成处理。
对所有性能结果持保留态度。我不知道你 运行 这是什么系统。我将提供我的相关系统统计信息:
2 至强 X5550 @2.67GHz
24MB DDR3 @1333MHz
德比安 10
Python3.7.3
我会根据您的尝试解决问题:
- 简单的单线程:由于
randint(0, 3)
延迟 ,这几乎保证至少需要 1.5 x num_packets 秒
- 多线程:GIL 可能是这里的瓶颈,但这可能是因为
craft packet
部分而不是 send packet
- 多处理:每个进程至少需要一个文件描述符,因此您可能超出了用户或系统限制,但如果您 change the appropriate settings
- 多处理+多线程:这与#2 的原因相同,制作数据包可能是 CPU 绑定
经验法则是:I/O 绑定 - 使用线程,CPU 绑定 - 使用进程
我正在实施一个 Python 脚本,该脚本需要在每个不到 5 秒的时间内并行发送 1500 多个数据包。
简而言之,我需要的是:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
for x in list[:1500]:
send_pkts(x)
time.sleep(randint(1,5))
我尝试了简单的单线程、多线程、多处理和多处理+多线程形式,但遇到了以下问题:
- 简单的单线程: “for 延迟”似乎损害了“5 秒”的依赖性。
- 多线程: 由于 Python GIL 限制,我想我无法完成我想要的。
- 多处理: 这似乎是最有效的方法。但是,由于进程过多,我所在的 VM 运行 脚本冻结(当然,1500 个进程 运行)。因此变得不切实际。
- 多处理+多线程: 在这种方法中,我创建了更少的进程,每个进程调用一些线程(假设:10 个进程每个调用 150 个线程)。很明显,VM 的冻结速度不如方法 3,但是我能达到的最多 "concurrent packet sending" 是 ~800。 GIL 限制?虚拟机限制? 在这次尝试中,我也尝试使用进程池,但结果相似。
我可以使用更好的方法来完成这项任务吗?
[1] 编辑 1:
def send_pkt(x):
#craft pkt
while True:
#send pkt
gevent.sleep(0)
gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])
[2] 编辑 2(gevent 猴子修补):
from gevent import monkey; monkey.patch_all()
jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]
但是我收到以下错误:"ValueError: filedescriptor out of range in select()"。所以我检查了我的系统 ulimit(Soft 和 Hard 都是最大值:65536)。 之后,我检查了它与 select() 限制有关 Linux (最大 1024 fds)。请检查:http://man7.org/linux/man-pages/man2/select.2.html (BUGS section) - In orderto overcome that I should use poll() (http://man7.org/linux/man-pages/man2/poll.2.html)。但是使用 poll() 我 return 有相同的限制:因为轮询是 "blocking approach".
此致,
您在选项 3 中的结果:"due to excessive quantity of process the VM where I am running the script freezes (of course, 1500 process running)" 需要进一步调查。我认为,根据目前收集到的信息,可能无法确定这是多处理方法的缺点还是 VM 的局限性更好。
一种相当简单直接的方法是 运行 缩放实验:与其让所有发送都来自单个进程或所有发送都来自同一进程,不如尝试中间值。计算将工作负载在两个进程(或 4、8 等)之间分成两半所需的时间。
在这样做的同时,运行 一个像 Windows 上的 xperf
或 Linux 上的 oprofile
这样的工具也可能是一个好主意,以记录是否这些不同的并行性选择导致了不同类型的瓶颈,例如抖动 CPU 缓存,运行 使 VM 内存不足,或者谁知道还有什么。最简单的方法就是尝试一下。
根据之前处理这些类型问题的经验和一般经验法则,我预计当多处理进程的数量小于或等于可用的 CPU 核心数量时,性能最佳(在 VM 本身或管理程序上)。然而,这是假设问题是 CPU 绑定的;如果在数据包发送过程中发生阻塞,如果与其他阻塞操作交错,可以更好地利用 CPU 时间,那么超过 #cpu 个进程的性能可能仍然会更高。不过,在完成一些分析 and/or 缩放实验之前,我们不知道。
你是正确的 python 是单线程的,但是你想要的任务(发送网络数据包)被认为是 IO 绑定操作,因此是多线程的一个很好的候选者。只要您在编写代码时考虑到异步,您的主线程在传输数据包时并不忙。
查看有关异步 tcp 网络的 python 文档 - https://docs.python.org/3/library/asyncio-protocol.html#tcp-echo-client。
在 Python 中使用并行性时,一个好的方法是使用 ThreadPoolExecutor 或 ProcessPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures 根据我的经验,这些效果很好。
可以根据您的使用进行调整的 threadedPoolExecutor 示例。
import concurrent.futures
import urllib.request
import time
IPs= ['168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204',
'168.212. 226.204']
def send_pkt(x):
status = 'Failed'
while True:
#send pkt
time.sleep(10)
status = 'Successful'
break
return status
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip, exc))
else:
print('%r send %s' % (url, data))
如果瓶颈是基于 http(“发送数据包”),那么 GIL 实际上应该不是什么大问题。
如果 python 中也有计算发生,那么 GIL 可能会妨碍,正如您所说,基于进程的并行性将是首选。
您不需要每个任务一个进程!这似乎是你思想上的疏忽。使用 python 的 Pool
class,您可以轻松创建一组将从队列接收任务的工作人员。
import multiprocessing
def send_pkts(ip):
...
number_of_workers = 8
with multiprocessing.Pool(number_of_workers) as pool:
pool.map(send_pkts, list[:1500])
你现在是运行number_of_workers + 1
个进程(工人+原来的进程),N个工人运行同时send_pkts
函数
阻碍您实现预期性能的主要问题是 send_pkts()
方法。它不仅发送数据包,还制作数据包:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
虽然发送数据包几乎肯定是一项 I/O 绑定任务,但制作数据包几乎肯定是一项 CPU 绑定任务。该方法需要拆分成两个任务:
- 制作一个数据包
- 发送数据包
我已经编写了一个基本的套接字服务器和一个客户端应用程序,用于制作数据包并将其发送到服务器。这个想法是有一个单独的过程来制作数据包并将它们放入队列中。有一个线程池与数据包制作过程共享队列。这些线程将数据包从队列中拉出并将它们发送到服务器。他们还将服务器的响应粘贴到另一个共享队列中,但这只是为了我自己的测试,与您尝试做的事情无关。当线程从队列中获得 None
(poison pill) 时退出。
server.py:
import argparse
import socketserver
import time
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="bind to host")
parser.add_argument("--port", type=int, help="bind to port")
parser.add_argument("--packet-size", type=int, help="size of packets")
args = parser.parse_args()
HOST, PORT = args.host, args.port
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
time.sleep(1.5)
data = self.request.recv(args.packet_size)
self.request.sendall(data.upper())
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
client.py:
import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread
def get_logger():
logger = logging.getLogger("threading_example")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("client.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class PacketMaker(mp.Process):
def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
mp.Process.__init__(self)
self.result_queue = result_queue
self.max_packets = max_packets
self.packet_size = packet_size
self.num_poison_pills = num_poison_pills
self.num_packets_made = 0
self.logger = logger
def run(self):
while True:
if self.num_packets_made >= self.max_packets:
for _ in range(self.num_poison_pills):
self.result_queue.put(None, timeout=1)
self.logger.debug('PacketMaker exiting')
return
self.result_queue.put(os.urandom(self.packet_size), timeout=1)
self.num_packets_made += 1
class PacketSender(Thread):
def __init__(self, task_queue, result_queue, addr, packet_size, logger):
Thread.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.server_addr = addr
self.packet_size = packet_size
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(addr)
self.logger = logger
def run(self):
while True:
packet = self.task_queue.get(timeout=1)
if packet is None:
self.logger.debug("PacketSender exiting")
return
try:
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
except socket.error:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_addr)
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
self.result_queue.put(response)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num-packets', type=int, help='number of packets to send')
parser.add_argument('--packet-size', type=int, help='packet size in bytes')
parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
parser.add_argument('--host', type=str, help='name of host packets will be sent to')
parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
args = parser.parse_args()
logger = get_logger()
logger.info(f"starting script with args {args}")
packets_to_send = mp.Queue(args.num_packets + args.num_threads)
packets_received = q.Queue(args.num_packets)
producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
for _ in range(args.num_threads)]
start_time = time.time()
logger.info("starting workers")
for worker in senders + producers:
worker.start()
for worker in senders:
worker.join()
logger.info("workers finished")
end_time = time.time()
print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")
run.sh:
#!/usr/bin/env bash
for i in "$@"
do
case $i in
-s=*|--packet-size=*)
packet_size="${i#*=}"
shift
;;
-n=*|--num-packets=*)
num_packets="${i#*=}"
shift
;;
-t=*|--num-threads=*)
num_threads="${i#*=}"
shift
;;
-h=*|--host=*)
host="${i#*=}"
shift
;;
-p=*|--port=*)
port="${i#*=}"
shift
;;
*)
;;
esac
done
python3 server.py --host="${host}" \
--port="${port}" \
--packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
--num-packets="${num_packets}" \
--num-threads="${num_threads}" \
--host="${host}" \
--port="${port}"
kill "${server_pid}"
$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999
1500 packets received in 4.70330023765564 seconds
$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999
1500 packets received in 1.5025699138641357 seconds
可以通过将 client.py 中的日志级别更改为 DEBUG
来验证此结果。请注意,该脚本确实需要 4.7 秒以上的时间才能完成。使用 300 个线程时需要进行相当多的拆卸,但日志清楚地表明线程在 4.7 秒时完成处理。
对所有性能结果持保留态度。我不知道你 运行 这是什么系统。我将提供我的相关系统统计信息: 2 至强 X5550 @2.67GHz 24MB DDR3 @1333MHz 德比安 10 Python3.7.3
我会根据您的尝试解决问题:
- 简单的单线程:由于
randint(0, 3)
延迟 ,这几乎保证至少需要 1.5 x num_packets 秒
- 多线程:GIL 可能是这里的瓶颈,但这可能是因为
craft packet
部分而不是send packet
- 多处理:每个进程至少需要一个文件描述符,因此您可能超出了用户或系统限制,但如果您 change the appropriate settings
- 多处理+多线程:这与#2 的原因相同,制作数据包可能是 CPU 绑定
经验法则是:I/O 绑定 - 使用线程,CPU 绑定 - 使用进程