在 python 中订购了 semaphore.aquire()
Ordered semaphore.aquire() in python
我有多个multiprocessing.Process()
获取和释放:
s = Semaphore(5)
s.acquire()
个调用是否保证按顺序完成?
如果不是,我可以使用什么来让第一个请求进程首先访问资源?
根据 threading.Semaphore.acquire
documentation :
If the internal counter is larger than zero on entry, decrement it by one and return True immediately.
If the internal counter is zero on entry, block until awoken by a call to release(). Once awoken (and the counter is greater than 0), decrement the counter by 1 and return True. Exactly one thread will be awoken by each call to release(). The order in which threads are awoken should not be relied on.
(强调我的)
换句话说,当你的信号量还没有归零时,调用就会立即成功。然后,所有调用都将阻塞,直到一个 release
被调用。此时,任何线程都可以从其 acquire
.
return
这是一个 Minimal Reproducible Example :
import threading
NB_TOTAL = 20
NB_SHARED = 5
sem = threading.Semaphore(5)
def acquire_semaphore_then_do_something(thread_number: int):
sem.acquire()
do_something(thread_number)
sem.release()
def do_something(number: int):
print(number)
# create the threads
threads = [threading.Thread(target=acquire_semaphore_then_do_something, args=(thread_number,))
for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
thread.start()
# wait for all of them to finish
for thread in threads:
thread.join()
如果您真的需要线程运行完全,我建议以不同的方式处理事情:
import threading
NB_TOTAL = 20
NB_SHARED = 5
THREAD_LOCKS = [threading.Lock() for _ in range(NB_TOTAL)]
LOCKS_LOCK = threading.Lock() # lock for the locks management
NEXT_UNLOCK_INDEX = NB_SHARED
def acquire_sequentially_then_do_something(thread_number: int):
# try to acquire the dedicated lock
THREAD_LOCKS[thread_number].acquire() # blocking
# dedicated lock acquired, proceeding
do_something(thread_number)
# now it has finished its work, it can unlock the next thread
global NEXT_UNLOCK_INDEX
if NEXT_UNLOCK_INDEX < NB_TOTAL:
with LOCKS_LOCK:
THREAD_LOCKS[NEXT_UNLOCK_INDEX].release()
NEXT_UNLOCK_INDEX += 1
def do_something(number: int):
print(number)
# lock all the locks, except the NB_SHARED first : indexes from 0 to NB_SHARED-1
for number, lock in enumerate(THREAD_LOCKS):
if number >= NB_SHARED:
assert lock.acquire(blocking=False)
# create the threads
threads = [threading.Thread(target=acquire_sequentially_then_do_something, args=(thread_number,))
for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
thread.start()
# wait for all of them to finish
for thread in threads:
thread.join()
正确打印
0
1
2
[...]
17
18
19
思路是NEXT_UNLOCK_INDEX
会加1,每次解锁特定的锁,这样对应的线程才能最终锁定并继续。它确保线程遵守“先到先得”的原则。
我有多个multiprocessing.Process()
获取和释放:
s = Semaphore(5)
s.acquire()
个调用是否保证按顺序完成?
如果不是,我可以使用什么来让第一个请求进程首先访问资源?
根据 threading.Semaphore.acquire
documentation :
If the internal counter is larger than zero on entry, decrement it by one and return True immediately.
If the internal counter is zero on entry, block until awoken by a call to release(). Once awoken (and the counter is greater than 0), decrement the counter by 1 and return True. Exactly one thread will be awoken by each call to release(). The order in which threads are awoken should not be relied on.
(强调我的)
换句话说,当你的信号量还没有归零时,调用就会立即成功。然后,所有调用都将阻塞,直到一个 release
被调用。此时,任何线程都可以从其 acquire
.
这是一个 Minimal Reproducible Example :
import threading
NB_TOTAL = 20
NB_SHARED = 5
sem = threading.Semaphore(5)
def acquire_semaphore_then_do_something(thread_number: int):
sem.acquire()
do_something(thread_number)
sem.release()
def do_something(number: int):
print(number)
# create the threads
threads = [threading.Thread(target=acquire_semaphore_then_do_something, args=(thread_number,))
for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
thread.start()
# wait for all of them to finish
for thread in threads:
thread.join()
如果您真的需要线程运行完全,我建议以不同的方式处理事情:
import threading
NB_TOTAL = 20
NB_SHARED = 5
THREAD_LOCKS = [threading.Lock() for _ in range(NB_TOTAL)]
LOCKS_LOCK = threading.Lock() # lock for the locks management
NEXT_UNLOCK_INDEX = NB_SHARED
def acquire_sequentially_then_do_something(thread_number: int):
# try to acquire the dedicated lock
THREAD_LOCKS[thread_number].acquire() # blocking
# dedicated lock acquired, proceeding
do_something(thread_number)
# now it has finished its work, it can unlock the next thread
global NEXT_UNLOCK_INDEX
if NEXT_UNLOCK_INDEX < NB_TOTAL:
with LOCKS_LOCK:
THREAD_LOCKS[NEXT_UNLOCK_INDEX].release()
NEXT_UNLOCK_INDEX += 1
def do_something(number: int):
print(number)
# lock all the locks, except the NB_SHARED first : indexes from 0 to NB_SHARED-1
for number, lock in enumerate(THREAD_LOCKS):
if number >= NB_SHARED:
assert lock.acquire(blocking=False)
# create the threads
threads = [threading.Thread(target=acquire_sequentially_then_do_something, args=(thread_number,))
for thread_number in range(NB_TOTAL)]
# start them all
for thread in threads:
thread.start()
# wait for all of them to finish
for thread in threads:
thread.join()
正确打印
0
1
2
[...]
17
18
19
思路是NEXT_UNLOCK_INDEX
会加1,每次解锁特定的锁,这样对应的线程才能最终锁定并继续。它确保线程遵守“先到先得”的原则。