处理 async/multithreading 中的可变共享对象。替代“.copy()”?
Handling mutable shared objects in async/multithreading. Alternative to '.copy()'?
我正在编写一个程序,创建一组工作人员使用 aiohttp
异步调用 API。然而,这个问题是关于共享对象的。我假设如果我是多线程的,我会 运行 遇到相同或类似的问题。
我有一组所有工作人员共享的默认 URL 参数,但是其中两个参数的值在工作人员之间发生了变化:
DEFAULT_PARAMS = {
'q' : None, #<==CHANGES per worker
'offset' : '0', #<==CHANGES per worker
'mkt' : 'en-US', #<==STATIC for all workers
'moreParams' : '<most of the data>' #<==STATIC for all workers
}
下面是我如何初始化 Worker()
class:
class Worker(object):
def __init__(self, q):
# this copy iexpensive when > 100 workers.
self.initial_params = DEFAULT_PARAMS.copy()
# but witout copying entire default params dict, the next line
# would add alter the 'q' value for all instances of Worker.
self.initial_params.update({'q' : q})
我正在为我创建的每个新工作人员寻找调用 DEFAULT_PARAMS.copy()
的替代方法。
弄清楚如何提出这个问题一直是一个挑战。我怀疑我的答案可能位于 class 通过实例属性的某个地方。
这是我的程序的一个非常准系统的例子:
import aiohttp
import asyncio
DEFUALT_PARAMS = {
'q' : None, #<==CHANGES per worker
'offset' : '0', #<==CHANGES per worker
'mkt' : 'en-US', #<==STATIC for all workers
'moreParams' : '<most of the data>' #<==STATIC for all workers
}
class Worker(object):
def __init__(self, q):
self.initial_params = DEFUALT_PARAMS.copy() # <==expensive
self.initial_params.update({'q' : q}) #<==without copying, overwrites ref for all classes.
async def call_api(self):
async with aiohttp.ClientSession() as sesh:
async with sesh.get(
'https://somesearchengine.com/search?',
params=self.initial_params
) as resp:
assert resp.status == 200
print(await resp.json())
async def main(workers, *, loop=None):
tasks = (asyncio.ensure_future(i.call_api(), loop=loop) for i in workers)
await asyncio.gather(*tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queries = ['foo', 'bar', 'baz']
workers = (Worker(i) for i in queries)
loop.run_until_complete(main(workers, loop=loop))
复制字典,即使是 100 个工人,也不那么昂贵。您可以在 7 微秒内创建 1000 键字典的副本 并 更新它:
>>> from timeit import Timer
>>> from secrets import token_urlsafe
>>> test_dict = {token_urlsafe(): token_urlsafe() for _ in range(1000)}
>>> len(test_dict)
1000
>>> count, total = Timer("p = d.copy(); p.update({'q' : q})", "from __main__ import test_dict as d; q = 42").autorange()
>>> print(total/count*1000000) # microseconds are 10**-6 seconds
7.146239580000611
所以我想说这里真的没有问题。
但是,您实际上是在分层词典内容;每个工人只需调整一个或两个键。您可以使用 collections.ChainMap()
object 来处理分层,而不是创建副本。 ChainMap()
对象需要一个以上的字典,并且会在其中查找键,直到找到一个值。不创建副本,最顶层的字典用于在更改映射时设置值:
from collections import ChainMap
# ...
self.initial_params = ChainMap({'q': q}, DEFAULT_PARAMS)
创建 ChainMap()
对象仍然更便宜:
>>> count, total = Timer("p = ChainMap({'q': q}, d)", "from __main__ import test_dict as d; q = 42; from collections import ChainMap").autorange()
>>> print(total/count*1000000)
0.5310121239999717
所以只有半微秒。当然,这是以较慢的迭代和按键访问为代价的。这将取决于 aiohttp
如何处理这些,我建议您使用 timeit
模块进行自己的微基准测试,以衡量您的代码正在执行的实际操作的性能。
但是请注意,尝试像这样处理共享状态时总是要付出代价的,对于任何并发模型,和在实例之间共享字典总是有问题,即使没有并发。
如果 q 由工人所有,为什么不直接将其作为工人本身的实例变量。
class Worker(object):
def __init__(self, q):
self.q = q
随心所欲 q
self.q
我正在编写一个程序,创建一组工作人员使用 aiohttp
异步调用 API。然而,这个问题是关于共享对象的。我假设如果我是多线程的,我会 运行 遇到相同或类似的问题。
我有一组所有工作人员共享的默认 URL 参数,但是其中两个参数的值在工作人员之间发生了变化:
DEFAULT_PARAMS = {
'q' : None, #<==CHANGES per worker
'offset' : '0', #<==CHANGES per worker
'mkt' : 'en-US', #<==STATIC for all workers
'moreParams' : '<most of the data>' #<==STATIC for all workers
}
下面是我如何初始化 Worker()
class:
class Worker(object):
def __init__(self, q):
# this copy iexpensive when > 100 workers.
self.initial_params = DEFAULT_PARAMS.copy()
# but witout copying entire default params dict, the next line
# would add alter the 'q' value for all instances of Worker.
self.initial_params.update({'q' : q})
我正在为我创建的每个新工作人员寻找调用 DEFAULT_PARAMS.copy()
的替代方法。
弄清楚如何提出这个问题一直是一个挑战。我怀疑我的答案可能位于 class 通过实例属性的某个地方。
这是我的程序的一个非常准系统的例子:
import aiohttp
import asyncio
DEFUALT_PARAMS = {
'q' : None, #<==CHANGES per worker
'offset' : '0', #<==CHANGES per worker
'mkt' : 'en-US', #<==STATIC for all workers
'moreParams' : '<most of the data>' #<==STATIC for all workers
}
class Worker(object):
def __init__(self, q):
self.initial_params = DEFUALT_PARAMS.copy() # <==expensive
self.initial_params.update({'q' : q}) #<==without copying, overwrites ref for all classes.
async def call_api(self):
async with aiohttp.ClientSession() as sesh:
async with sesh.get(
'https://somesearchengine.com/search?',
params=self.initial_params
) as resp:
assert resp.status == 200
print(await resp.json())
async def main(workers, *, loop=None):
tasks = (asyncio.ensure_future(i.call_api(), loop=loop) for i in workers)
await asyncio.gather(*tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queries = ['foo', 'bar', 'baz']
workers = (Worker(i) for i in queries)
loop.run_until_complete(main(workers, loop=loop))
复制字典,即使是 100 个工人,也不那么昂贵。您可以在 7 微秒内创建 1000 键字典的副本 并 更新它:
>>> from timeit import Timer
>>> from secrets import token_urlsafe
>>> test_dict = {token_urlsafe(): token_urlsafe() for _ in range(1000)}
>>> len(test_dict)
1000
>>> count, total = Timer("p = d.copy(); p.update({'q' : q})", "from __main__ import test_dict as d; q = 42").autorange()
>>> print(total/count*1000000) # microseconds are 10**-6 seconds
7.146239580000611
所以我想说这里真的没有问题。
但是,您实际上是在分层词典内容;每个工人只需调整一个或两个键。您可以使用 collections.ChainMap()
object 来处理分层,而不是创建副本。 ChainMap()
对象需要一个以上的字典,并且会在其中查找键,直到找到一个值。不创建副本,最顶层的字典用于在更改映射时设置值:
from collections import ChainMap
# ...
self.initial_params = ChainMap({'q': q}, DEFAULT_PARAMS)
创建 ChainMap()
对象仍然更便宜:
>>> count, total = Timer("p = ChainMap({'q': q}, d)", "from __main__ import test_dict as d; q = 42; from collections import ChainMap").autorange()
>>> print(total/count*1000000)
0.5310121239999717
所以只有半微秒。当然,这是以较慢的迭代和按键访问为代价的。这将取决于 aiohttp
如何处理这些,我建议您使用 timeit
模块进行自己的微基准测试,以衡量您的代码正在执行的实际操作的性能。
但是请注意,尝试像这样处理共享状态时总是要付出代价的,对于任何并发模型,和在实例之间共享字典总是有问题,即使没有并发。
如果 q 由工人所有,为什么不直接将其作为工人本身的实例变量。
class Worker(object):
def __init__(self, q):
self.q = q
随心所欲 q
self.q