多处理生成的强制减速与 OSError 打开文件太多
Forced Slowdown of Multiprocessing Generation vs OSError Too Many Open Files
我有以下代码。在测试中,我发现当我获得数百个并发子进程(大约 400 个?)时,我得到“OSError Too Many Open Files”。知道为什么吗?
我可以通过 time.sleep(.005) 调用解决问题,但我不应该这样做。
这是一个更大程序的一部分。典型的调用将设置服务器字符串、令牌字符串和包含数千个设备的列表。对于使用的 REST API 调用,服务器一次只能处理一个设备。在测试中,这导致执行时间为 20 分钟,但有迹象表明,使用多处理方法可以将其减少到 30 秒左右。
import urllib, requests, json, sys, pprint, time, multiprocessing as mp
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
###########################
### handler function for multiprocessing worker
###########################
def getAttributesOneDevice(server, device, token, q):
"""Handler function for getting a single device"""
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
headers = { "Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token }
query = { "hostname" : device }
response = requests.get(serverURL, headers = headers, params = query, verify = False)
q.put(response.json())
# end getAttributesOneDevice()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
device.
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
resultsTable = []
MissedDevices = []
procList = []
for dev in devices:
q = mp.Queue()
proc = mp.Process(target=getAttributesOneDevice,
args=(server, dev, token, q))
proc.start()
procList += [ {"proc" : proc, "dev" : dev, "queue" : q} ]
# If I don't do this as I'm going, I *always* get "OSError too many open files"
updatedProcList = []
for proc in procList:
if proc["proc"].is_alive():
updatedProcList += [proc]
else:
# kill zombies
if proc["queue"].empty():
MissedDevices += [ proc["dev"] ]
else:
queueData = proc["queue"].get()
resultsTable += [ queueData ]
while not proc["queue"].empty():
# drain whatever's left before we closeout the process
proc["queue"].get()
proc["proc"].join()
procList = updatedProcList
# if I don't do this, I get "OSError too many open files" at somewhere
# around 375-400 child processes
time.sleep(.005)
# I could instead embed the list comprehension in the while statement,
# but that would hinder troubleshooting
remainingProcs = [ 1 ]
while len(remainingProcs) > 0:
remainingProcs = [ proc for proc in procList if proc["proc"].is_alive()]
time.sleep(1)
for proc in procList:
# kill zombies
if proc["queue"].empty():
MissedDevices += [ proc["dev"] ]
else:
queueData = proc["queue"].get()
resultsTable += [ queueData ]
while not proc["queue"].empty():
# drain whatever's left before we closeout the process
proc["queue"].get()
proc["proc"].join()
return { "Table" : resultsTable, "Missed" : MissedDevices }
您应该将多线程与多线程池(可以轻松处理多达 500 个线程)结合使用,因为看到 getAttributesOneDevice
几乎所有时间都在等待网络请求完成。您还应该使用 requests.Session
对象来执行 GET 请求,因为根据文档:
The Session object allows you to persist certain parameters across requests. It also persists cookies across all requests made from the Session instance, and will use urllib3’s connection pooling. So if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase (see HTTP persistent connection).
worker 函数 getAttributesOneDevice
应修改为在未能捕获设备时引发异常。
import urllib, requests, json, sys, pprint, time
from multiprocessing.pool import ThreadPool
from functools import partial
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
###########################
### handler function for multiprocessing worker
###########################
def getAttributesOneDevice(session, serverURL, token, device):
"""Handler function for getting a single device"""
query = { "hostname" : device }
response = session.get(serverURL, params = query, verify = False)
# Raise an exception if unable to capture a device
response.raise_for_status()
# Should the response itself be checked to ensure a device was captured
# and an expection be raised if not?
return response.json()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
device.
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
with requests.Session() as session, \
ThreadPool(min(len(devices), 500)) as pool:
session.headers = { "Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token }
# Compute this once here:
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
# The serverUrl and token arguments never vary:
worker = partial(getAttributesOneDevicesession, serverURL, token)
resultsTable = []
MissedDevices = []
results = pool.imap(worker, devices)
device_index = 0
while True:
try:
result.append(results.__next__())
except StopIteration:
break
except:
# This is the device that caused the exception.
# The assumption is that devices is indexable:
MissedDevices.append(devices[device_index])
finally:
device_index += 1
感谢@Booboo 和@Olvin-Roght 的帮助。我已经标记了 Booboo 的答案和“已接受”的答案,最终它是两者的结合。这就是我实际上最终得到的结果,为了透明度,以防它在未来帮助其他人,但我只是从这个线程上的人们的指针那里得到的。谢谢大家。
import requests, json, sys, concurrent.futures
# tested on 3.6.8
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
#needed to REALLY condense the letters in the function name for submit()
def get1Dev(server, device, token):
"""Handler function for getting a single device. raises an exception if
unseccessful query, or returns the dict of the detailed attributes table."""
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
query = { "hostname" : device }
headers = {
"Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token
}
# ProcessPoolExecutor already handles all exceptions
response = requests.get(serverURL,headers=headers,params=query,verify=False)
if response.status_code is not 200:
raise Exception(str(response.status_code) + " returned from server")
responseStatusDescription = response.json()["statusDescription"]
if "Success." not in responseStatusDescription:
raise Exception(f"{responseStatusDescription} returned from server")
return response.json()["attributes"]
# end get1Dev()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
# will raise() if needed; purposefully not including in a "try" block
inputValidate(server, token, devices)
# remove all empty strings in 'devices'
devices = [ dev for dev in devices if dev ]
resultsTable = []
MissedDevices = []
# profiling data for max_workers, 06 Feb 2022:
# single-threaded single-processed: 20min (ish)
# 61 is default, took 6min
# 10000 resulted in "too many open files" error
# detailed analysis: "too many open files" occurs ~350-400 child procs
# 300 took 40sec, so we'll call that the "sweet spot"
exe = concurrent.futures.ProcessPoolExecutor(max_workers=300)
# exe.map() is too trivial in that it halts on first exception :(
# derived from example for ThreadPoolExecutor
# https://docs.python.org/3/library/concurrent.futures.html
results = {exe.submit(get1Dev, server, dev, token): dev for dev in devices}
for result in concurrent.futures.as_completed(results, timeout=300):
try:
# separating calling result() from the += allows exception handling
# without worrying about adding error result to resultsTable
res = result.result()
resultsTable += [ res ]
# need to catch both exceptions
except (Exception, concurrent.futures.TimeoutError):
# not sure why, results[result] is related dev in devices, as desired
MissedDevices += [ results[result] ]
return { "Table" : resultsTable, "Missed" : MissedDevices }
我有以下代码。在测试中,我发现当我获得数百个并发子进程(大约 400 个?)时,我得到“OSError Too Many Open Files”。知道为什么吗?
我可以通过 time.sleep(.005) 调用解决问题,但我不应该这样做。
这是一个更大程序的一部分。典型的调用将设置服务器字符串、令牌字符串和包含数千个设备的列表。对于使用的 REST API 调用,服务器一次只能处理一个设备。在测试中,这导致执行时间为 20 分钟,但有迹象表明,使用多处理方法可以将其减少到 30 秒左右。
import urllib, requests, json, sys, pprint, time, multiprocessing as mp
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
###########################
### handler function for multiprocessing worker
###########################
def getAttributesOneDevice(server, device, token, q):
"""Handler function for getting a single device"""
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
headers = { "Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token }
query = { "hostname" : device }
response = requests.get(serverURL, headers = headers, params = query, verify = False)
q.put(response.json())
# end getAttributesOneDevice()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
device.
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
resultsTable = []
MissedDevices = []
procList = []
for dev in devices:
q = mp.Queue()
proc = mp.Process(target=getAttributesOneDevice,
args=(server, dev, token, q))
proc.start()
procList += [ {"proc" : proc, "dev" : dev, "queue" : q} ]
# If I don't do this as I'm going, I *always* get "OSError too many open files"
updatedProcList = []
for proc in procList:
if proc["proc"].is_alive():
updatedProcList += [proc]
else:
# kill zombies
if proc["queue"].empty():
MissedDevices += [ proc["dev"] ]
else:
queueData = proc["queue"].get()
resultsTable += [ queueData ]
while not proc["queue"].empty():
# drain whatever's left before we closeout the process
proc["queue"].get()
proc["proc"].join()
procList = updatedProcList
# if I don't do this, I get "OSError too many open files" at somewhere
# around 375-400 child processes
time.sleep(.005)
# I could instead embed the list comprehension in the while statement,
# but that would hinder troubleshooting
remainingProcs = [ 1 ]
while len(remainingProcs) > 0:
remainingProcs = [ proc for proc in procList if proc["proc"].is_alive()]
time.sleep(1)
for proc in procList:
# kill zombies
if proc["queue"].empty():
MissedDevices += [ proc["dev"] ]
else:
queueData = proc["queue"].get()
resultsTable += [ queueData ]
while not proc["queue"].empty():
# drain whatever's left before we closeout the process
proc["queue"].get()
proc["proc"].join()
return { "Table" : resultsTable, "Missed" : MissedDevices }
您应该将多线程与多线程池(可以轻松处理多达 500 个线程)结合使用,因为看到 getAttributesOneDevice
几乎所有时间都在等待网络请求完成。您还应该使用 requests.Session
对象来执行 GET 请求,因为根据文档:
The Session object allows you to persist certain parameters across requests. It also persists cookies across all requests made from the Session instance, and will use urllib3’s connection pooling. So if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase (see HTTP persistent connection).
worker 函数 getAttributesOneDevice
应修改为在未能捕获设备时引发异常。
import urllib, requests, json, sys, pprint, time
from multiprocessing.pool import ThreadPool
from functools import partial
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
###########################
### handler function for multiprocessing worker
###########################
def getAttributesOneDevice(session, serverURL, token, device):
"""Handler function for getting a single device"""
query = { "hostname" : device }
response = session.get(serverURL, params = query, verify = False)
# Raise an exception if unable to capture a device
response.raise_for_status()
# Should the response itself be checked to ensure a device was captured
# and an expection be raised if not?
return response.json()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
device.
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
with requests.Session() as session, \
ThreadPool(min(len(devices), 500)) as pool:
session.headers = { "Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token }
# Compute this once here:
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
# The serverUrl and token arguments never vary:
worker = partial(getAttributesOneDevicesession, serverURL, token)
resultsTable = []
MissedDevices = []
results = pool.imap(worker, devices)
device_index = 0
while True:
try:
result.append(results.__next__())
except StopIteration:
break
except:
# This is the device that caused the exception.
# The assumption is that devices is indexable:
MissedDevices.append(devices[device_index])
finally:
device_index += 1
感谢@Booboo 和@Olvin-Roght 的帮助。我已经标记了 Booboo 的答案和“已接受”的答案,最终它是两者的结合。这就是我实际上最终得到的结果,为了透明度,以防它在未来帮助其他人,但我只是从这个线程上的人们的指针那里得到的。谢谢大家。
import requests, json, sys, concurrent.futures
# tested on 3.6.8
assert sys.version_info >= (3, 6), "Must use Python 3.6+"
#needed to REALLY condense the letters in the function name for submit()
def get1Dev(server, device, token):
"""Handler function for getting a single device. raises an exception if
unseccessful query, or returns the dict of the detailed attributes table."""
serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
query = { "hostname" : device }
headers = {
"Accept" : "application/json",
"Content-Type" : "application/json",
"token" : token
}
# ProcessPoolExecutor already handles all exceptions
response = requests.get(serverURL,headers=headers,params=query,verify=False)
if response.status_code is not 200:
raise Exception(str(response.status_code) + " returned from server")
responseStatusDescription = response.json()["statusDescription"]
if "Success." not in responseStatusDescription:
raise Exception(f"{responseStatusDescription} returned from server")
return response.json()["attributes"]
# end get1Dev()
def GetDeviceAttributes(server = "", token = "", devices = []):
"""
See this URL for explanation of what this function does
https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
/REST APIs Documentation/Devices Management
/Get Device Attributes API.md
To summarize the URL: will acquire detailed device attributes for a single
This subroutine therefore queries for all devices provided, and assemble the
results into a single list of dicts.
Server queries are relatively expensive. A single query is not a big deal,
but accumulated across a massive device list and this can take excessive
time to execute (20min, etc). Therefor, this procedure is parallelized
through multi-processing to complete in a reasonable amount of time.
'server' should be a string that is just the http(s)://<FQDN>. Do not
incude the trailing '/'.
'token' should be an authentication token that was generated by
GetLoginToken and SetWorkingDomain modules in this directory.
'devices' should be a list of strings, where each entry is a device.
return a single dictionary:
"Table" a list of dicts, each dict the detailed attributes of a device
"Missed" a list of devices that had no result
Note that failure to capture a device is distinct from function failure.
"""
# will raise() if needed; purposefully not including in a "try" block
inputValidate(server, token, devices)
# remove all empty strings in 'devices'
devices = [ dev for dev in devices if dev ]
resultsTable = []
MissedDevices = []
# profiling data for max_workers, 06 Feb 2022:
# single-threaded single-processed: 20min (ish)
# 61 is default, took 6min
# 10000 resulted in "too many open files" error
# detailed analysis: "too many open files" occurs ~350-400 child procs
# 300 took 40sec, so we'll call that the "sweet spot"
exe = concurrent.futures.ProcessPoolExecutor(max_workers=300)
# exe.map() is too trivial in that it halts on first exception :(
# derived from example for ThreadPoolExecutor
# https://docs.python.org/3/library/concurrent.futures.html
results = {exe.submit(get1Dev, server, dev, token): dev for dev in devices}
for result in concurrent.futures.as_completed(results, timeout=300):
try:
# separating calling result() from the += allows exception handling
# without worrying about adding error result to resultsTable
res = result.result()
resultsTable += [ res ]
# need to catch both exceptions
except (Exception, concurrent.futures.TimeoutError):
# not sure why, results[result] is related dev in devices, as desired
MissedDevices += [ results[result] ]
return { "Table" : resultsTable, "Missed" : MissedDevices }