在 multiprocessing.Queue 中更改缓冲区大小
Changing the Buffer size in multiprocessing.Queue
所以我有一个系统,生产者和消费者通过无限大小的队列连接,但是如果消费者重复调用 get 直到抛出 Empty 异常,它不会清除队列。
我认为这是因为一旦套接字缓冲区已满,消费者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会一直等到缓冲区有 space ,但是,消费者有可能调用 get "too fast" ,因此它认为队列是空的,而实际上另一端的线程有更多的数据要发送,但不能足够快地序列化它以防止套接字对消费者来说是空的。
我相信如果我可以更改底层套接字上的缓冲区大小(我是 windows),这个问题会得到缓解。据我所知,我需要做的是:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
如果我执行上述操作,是否意味着当 multirprocssing 初始化队列时它将使用我在已导入的 multiprocessing.connections 版本中设置的新缓冲区大小?对吗?
另外我相信这只会影响 windows,因为 BUFSIZE 没有在 linux 机器上使用,因为它们的所有套接字默认设置为 60 KB?
有人试过这个吗?这会对 windows 产生副作用吗? windows 上套接字缓冲区大小的基本限制是什么?
===================演示代码示例===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
这个示例,当 运行 在 windows 时,通常只会从队列中拉出大约 160,000 个项目,因为主线程可以比供应商重新填充缓冲区更快地清空缓冲区,并最终当缓冲区为空并报告它为空时,它会尝试从队列中拉出。
理论上,您可以通过设置更大的缓冲区大小来改善此问题。我相信,在 windows 系统上,顶部的两行会增加管道的默认缓冲区大小。
如果您在其中评论它们,那么此脚本将在退出前提取更多数据,因为它具有更高的 .我的主要问题是:
1)这真的有效吗?
2) 有没有办法让这段代码在 windows 和 linux 中使用相同大小的底层缓冲区
3) 为管道设置大缓冲区大小是否有任何意想不到的副作用。
我知道,一般来说,没有办法知道您是否已经从队列中提取了所有数据(- 假设供应商 运行 是永久性的并且生产数据非常不均衡),但是我正在寻找尽最大努力改进它的方法。
更新:
有用的link of Windows Pipe 供以后需要的人使用(link由OP提供,phil_20686):
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx
原文:
BUFSIZE 仅在平台为 win32 时有效。
multiprocessing.Queue建立在Pipe之上,如果你改变BUFSIZE,你生成的Queue将使用更新后的值。见下文:
class Queue(object):
def __init__(self, maxsize=0):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)
当平台为win32时,Pipe代码会调用如下代码:
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = win32.PIPE_ACCESS_DUPLEX
access = win32.GENERIC_READ | win32.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = win32.PIPE_ACCESS_INBOUND
access = win32.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
address, openmode,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
可以看到当duplex
为False时,outbuffer size为0,inbuffer size为BUFSIZE。
inbuffer 是为输入缓冲区保留的字节数。 2**16=65536,是一个操作可以不阻塞写入的最大字节数,但是buffer size的容量因系统而异,即使是同一个系统也不同,所以不好说将管道设置为最大数量时的效果。
所以我有一个系统,生产者和消费者通过无限大小的队列连接,但是如果消费者重复调用 get 直到抛出 Empty 异常,它不会清除队列。
我认为这是因为一旦套接字缓冲区已满,消费者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会一直等到缓冲区有 space ,但是,消费者有可能调用 get "too fast" ,因此它认为队列是空的,而实际上另一端的线程有更多的数据要发送,但不能足够快地序列化它以防止套接字对消费者来说是空的。
我相信如果我可以更改底层套接字上的缓冲区大小(我是 windows),这个问题会得到缓解。据我所知,我需要做的是:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
如果我执行上述操作,是否意味着当 multirprocssing 初始化队列时它将使用我在已导入的 multiprocessing.connections 版本中设置的新缓冲区大小?对吗?
另外我相信这只会影响 windows,因为 BUFSIZE 没有在 linux 机器上使用,因为它们的所有套接字默认设置为 60 KB?
有人试过这个吗?这会对 windows 产生副作用吗? windows 上套接字缓冲区大小的基本限制是什么?
===================演示代码示例===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
这个示例,当 运行 在 windows 时,通常只会从队列中拉出大约 160,000 个项目,因为主线程可以比供应商重新填充缓冲区更快地清空缓冲区,并最终当缓冲区为空并报告它为空时,它会尝试从队列中拉出。
理论上,您可以通过设置更大的缓冲区大小来改善此问题。我相信,在 windows 系统上,顶部的两行会增加管道的默认缓冲区大小。
如果您在其中评论它们,那么此脚本将在退出前提取更多数据,因为它具有更高的 .我的主要问题是: 1)这真的有效吗? 2) 有没有办法让这段代码在 windows 和 linux 中使用相同大小的底层缓冲区 3) 为管道设置大缓冲区大小是否有任何意想不到的副作用。
我知道,一般来说,没有办法知道您是否已经从队列中提取了所有数据(- 假设供应商 运行 是永久性的并且生产数据非常不均衡),但是我正在寻找尽最大努力改进它的方法。
更新:
有用的link of Windows Pipe 供以后需要的人使用(link由OP提供,phil_20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx
原文:
BUFSIZE 仅在平台为 win32 时有效。
multiprocessing.Queue建立在Pipe之上,如果你改变BUFSIZE,你生成的Queue将使用更新后的值。见下文:
class Queue(object):
def __init__(self, maxsize=0):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)
当平台为win32时,Pipe代码会调用如下代码:
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = win32.PIPE_ACCESS_DUPLEX
access = win32.GENERIC_READ | win32.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = win32.PIPE_ACCESS_INBOUND
access = win32.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
address, openmode,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
可以看到当duplex
为False时,outbuffer size为0,inbuffer size为BUFSIZE。
inbuffer 是为输入缓冲区保留的字节数。 2**16=65536,是一个操作可以不阻塞写入的最大字节数,但是buffer size的容量因系统而异,即使是同一个系统也不同,所以不好说将管道设置为最大数量时的效果。