Python 中的管理员如何进行读写操作?
How do read and writes work with a manager in Python?
抱歉,如果这是一个愚蠢的问题,但我无法理解经理在 python 中的工作方式。
假设我有一个管理器,其中包含要在所有进程之间共享的字典。我希望一次只有一个进程写入字典,而许多其他进程从字典中读取。
- 这是否可以同时发生,没有同步原语,或者如果 read/writes 同时发生,是否会出现问题?
- 如果我想让多个进程同时写入字典怎么办 - 允许还是会中断(我知道这可能导致竞争条件,但它会出错吗)?
- 此外,管理器是像时尚一样在队列中处理每个读写事务,一次处理一个事务,还是一次性处理所有事务?
https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes
这取决于你如何写入字典,即操作是否原子:
my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic
因此,在上面第一行代码中创建新密钥和更新现有密钥是原子操作。但是第二行代码实际上是多次操作相当于:
temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp
因此,如果两个进程并行执行 my_dict[some_key] += 1
,它们可能会读取相同的 temp = my_dict[some_key]
值并将 temp
递增到相同的新值,最终效果将是字典值只会增加一次。这可以证明如下:
from multiprocessing import Pool, Manager, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1
def worker2(d):
for _ in range(1000):
d['y'] += 1
if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)
打印:
{'x': 2000, 'y': 1162}
更新
就序列化而言:
BaseManager
创建服务器,默认使用 Linux 的套接字和 Windows 的命名管道。因此,例如,基本上您针对托管字典执行的每个方法都非常类似于通过消息传递实现的远程方法调用。这也意味着服务器也可以 运行ning 在另一台计算机上。 但是,这些方法调用没有序列化;对象方法本身必须是线程安全的,因为每个方法调用都是 运行 在一个新线程中。
以下是创建我们自己的托管类型并让服务器侦听可能来自不同计算机的请求的示例(尽管在此示例中,客户端 运行 正在同一台计算机上)。客户端跨两个线程在托管对象上调用 increment
1000 次,但是方法实现不是在锁定下完成的,因此当我们全部完成时 self.x
的结果值不是 1000。另外,当我们通过方法 get_x
同时检索两次 x
的值时,我们看到两个调用或多或少同时启动:
from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time
class MathManager(BaseManager):
pass
class MathClass:
def __init__(self, x=0):
self.x = x
def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1
def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x
def set_x(self, value):
self.x = value
def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()
def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())
def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()
# Required for Windows:
if __name__ == '__main__':
main()
打印:
Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.
抱歉,如果这是一个愚蠢的问题,但我无法理解经理在 python 中的工作方式。
假设我有一个管理器,其中包含要在所有进程之间共享的字典。我希望一次只有一个进程写入字典,而许多其他进程从字典中读取。
- 这是否可以同时发生,没有同步原语,或者如果 read/writes 同时发生,是否会出现问题?
- 如果我想让多个进程同时写入字典怎么办 - 允许还是会中断(我知道这可能导致竞争条件,但它会出错吗)?
- 此外,管理器是像时尚一样在队列中处理每个读写事务,一次处理一个事务,还是一次性处理所有事务?
https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes
这取决于你如何写入字典,即操作是否原子:
my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic
因此,在上面第一行代码中创建新密钥和更新现有密钥是原子操作。但是第二行代码实际上是多次操作相当于:
temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp
因此,如果两个进程并行执行 my_dict[some_key] += 1
,它们可能会读取相同的 temp = my_dict[some_key]
值并将 temp
递增到相同的新值,最终效果将是字典值只会增加一次。这可以证明如下:
from multiprocessing import Pool, Manager, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1
def worker2(d):
for _ in range(1000):
d['y'] += 1
if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)
打印:
{'x': 2000, 'y': 1162}
更新
就序列化而言:
BaseManager
创建服务器,默认使用 Linux 的套接字和 Windows 的命名管道。因此,例如,基本上您针对托管字典执行的每个方法都非常类似于通过消息传递实现的远程方法调用。这也意味着服务器也可以 运行ning 在另一台计算机上。 但是,这些方法调用没有序列化;对象方法本身必须是线程安全的,因为每个方法调用都是 运行 在一个新线程中。
以下是创建我们自己的托管类型并让服务器侦听可能来自不同计算机的请求的示例(尽管在此示例中,客户端 运行 正在同一台计算机上)。客户端跨两个线程在托管对象上调用 increment
1000 次,但是方法实现不是在锁定下完成的,因此当我们全部完成时 self.x
的结果值不是 1000。另外,当我们通过方法 get_x
同时检索两次 x
的值时,我们看到两个调用或多或少同时启动:
from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time
class MathManager(BaseManager):
pass
class MathClass:
def __init__(self, x=0):
self.x = x
def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1
def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x
def set_x(self, value):
self.x = value
def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()
def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())
def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()
# Required for Windows:
if __name__ == '__main__':
main()
打印:
Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.