如何在将下一个项目放入队列之前等待其他 python 进程将字符串放入队列
How to wait for other python processes to put string into queue before putting next item into queue
我有一个小例子来解决我的问题:
from multiprocessing import Process, Queue
def test_1(q,j):
for i in range(10):
q.put('test_{}: '.format(j) + str(i),block=False)
q = Queue()
p1 = Process(target=test_1, args=(q,1))
p2 = Process(target=test_1, args=(q,2))
p1.start()
p2.start()
with open('test1.txt', 'w') as file:
while p1.is_alive() or p2.is_alive() or not q.empty():
try:
value = q.get(timeout = 1)
file.write(value + '\n')
except Exception as qe:
print("Empty Queue or dead process")
p1.join()
p2.join()
然后我的输出是这样的:
test_1: 0
test_1: 1
test_1: 2
test_1: 3
test_1: 4
test_1: 5
test_1: 6
test_1: 7
test_1: 8
test_1: 9
test_2: 0
test_2: 1
test_2: 2
test_2: 3
test_2: 4
test_2: 5
test_2: 6
test_2: 7
test_2: 8
test_2: 9
如何获得如下输出:
test_1: 0
test_2: 0
test_1: 1
test_2: 1
test_1: 2
test_2: 2
.
.
.
等等。
如果有人能帮助我,那就太好了。
出于您的兴趣,我想稍后使用它来匹配输入和输出向量作为机器学习方法的训练数据。
提前致谢,
亚兹
一般来说,当我想通过 python 使用多线程时,我会使用 ``ThreadPool```
我不确定这是否真的是您想要的,但您可以使用此处编写的此函数轻松获得结果:
from multiprocessing.pool import ThreadPool
from psutil import cpu_count
def test_1(thread):
for i in range(10):
string = ('test_{}: '.format(thread) + str(i))
print(string)
pool = ThreadPool(processes=cpu_count(logical=True))
lines_async = pool.apply_async(test_1, args=([1]))
lines_async2 = pool.apply_async(test_1, args=([2]))
lines_async.get()
lines_async2.get()
结果在这里:
test_1: 0
test_2: 0
test_1: 1
test_2: 1
test_1: 2
test_2: 2
test_1: 3
test_2: 3
test_1: 4
test_2: 4
test_1: 5
test_2: 5
test_1: 6
test_2: 6
test_1: 7
test_2: 7
test_2: 8
test_1: 8
test_2: 9
test_1: 9
但这与您执行的方法不同
编辑:如果您想获取函数的值,您可以使用它:
from multiprocessing.pool import ThreadPool
from psutil import cpu_count
pool = ThreadPool(processes=cpu_count(logical=True))
def test_1(thread, i):
string = ('test_{}: '.format(thread) + str(i))
return string
for i in range(10):
lines_async = pool.apply_async(test_1, args=([1,i]))
lines_async2 = pool.apply_async(test_1, args=([2,i]))
string1 = lines_async.get()
string2 = lines_async2.get()
print(string1)
print(string2)
这比你想要的更相似并且给出了相同的结果。
这里有一个基于并发的解决方案(真正的并发problem/solution):
from multiprocessing import Queue
from threading import Thread, Lock, Condition
def test_1(q,j,sem,other_has_writen,th):
for i in range(10):
# print("Th%i adquires conditional sem"%j)
other_has_writen.acquire()
# print("Th%i th=%i"%(j,th[0]))
if th[0]!=j:
# print("Th%i on IF"%j)
# print("Th%i adquires sem"%j)
sem.acquire()
th[0] = j
# print("Th%i releases sem"%j)
sem.release()
# print("Th%i th_after=%i"%(j,th[0]))
# print("Th%i prints -----> %i"%(j,i))
q.put('test_{}: '.format(j) + str(i),block=False)
other_has_writen.notify_all()
else:
# print("Th%i waits"%j)
other_has_writen.wait()
q.put('test_{}: '.format(j) + str(i),block=False)
other_has_writen.release()
# print("Th%i releases conditional sem"%j)
# other_has_writen.notify_all()
th = [1]
sem = Lock()
other_has_writen = Condition()
q = Queue()
p1 = Thread(target=test_1, args=(q,1,sem,other_has_writen,th))
p2 = Thread(target=test_1, args=(q,2,sem,other_has_writen,th))
p1.start()
p2.start()
# print('------- Program continues -------')
with open('test1.txt', 'w') as file:
while p1.is_alive() or p2.is_alive() or not q.empty():
try:
value = q.get(timeout = 1)
file.write(value + '\n')
except Exception as qe:
print("Empty Queue or dead process")
p1.join()
p2.join()
输出将始终交替,因为您需要另一个线程通知他已完成写入:
test_2: 0
test_1: 0
test_1: 1
test_2: 1
test_2: 2
test_1: 2
test_1: 3
test_2: 3
test_2: 4
test_1: 4
test_1: 5
test_2: 5
test_2: 6
test_1: 6
test_1: 7
test_2: 7
test_2: 8
test_1: 8
test_1: 9
test_2: 9
我有一个小例子来解决我的问题:
from multiprocessing import Process, Queue
def test_1(q,j):
for i in range(10):
q.put('test_{}: '.format(j) + str(i),block=False)
q = Queue()
p1 = Process(target=test_1, args=(q,1))
p2 = Process(target=test_1, args=(q,2))
p1.start()
p2.start()
with open('test1.txt', 'w') as file:
while p1.is_alive() or p2.is_alive() or not q.empty():
try:
value = q.get(timeout = 1)
file.write(value + '\n')
except Exception as qe:
print("Empty Queue or dead process")
p1.join()
p2.join()
然后我的输出是这样的:
test_1: 0
test_1: 1
test_1: 2
test_1: 3
test_1: 4
test_1: 5
test_1: 6
test_1: 7
test_1: 8
test_1: 9
test_2: 0
test_2: 1
test_2: 2
test_2: 3
test_2: 4
test_2: 5
test_2: 6
test_2: 7
test_2: 8
test_2: 9
如何获得如下输出:
test_1: 0
test_2: 0
test_1: 1
test_2: 1
test_1: 2
test_2: 2
.
.
.
等等。
如果有人能帮助我,那就太好了。
出于您的兴趣,我想稍后使用它来匹配输入和输出向量作为机器学习方法的训练数据。
提前致谢,
亚兹
一般来说,当我想通过 python 使用多线程时,我会使用 ``ThreadPool``` 我不确定这是否真的是您想要的,但您可以使用此处编写的此函数轻松获得结果:
from multiprocessing.pool import ThreadPool
from psutil import cpu_count
def test_1(thread):
for i in range(10):
string = ('test_{}: '.format(thread) + str(i))
print(string)
pool = ThreadPool(processes=cpu_count(logical=True))
lines_async = pool.apply_async(test_1, args=([1]))
lines_async2 = pool.apply_async(test_1, args=([2]))
lines_async.get()
lines_async2.get()
结果在这里:
test_1: 0
test_2: 0
test_1: 1
test_2: 1
test_1: 2
test_2: 2
test_1: 3
test_2: 3
test_1: 4
test_2: 4
test_1: 5
test_2: 5
test_1: 6
test_2: 6
test_1: 7
test_2: 7
test_2: 8
test_1: 8
test_2: 9
test_1: 9
但这与您执行的方法不同
编辑:如果您想获取函数的值,您可以使用它:
from multiprocessing.pool import ThreadPool
from psutil import cpu_count
pool = ThreadPool(processes=cpu_count(logical=True))
def test_1(thread, i):
string = ('test_{}: '.format(thread) + str(i))
return string
for i in range(10):
lines_async = pool.apply_async(test_1, args=([1,i]))
lines_async2 = pool.apply_async(test_1, args=([2,i]))
string1 = lines_async.get()
string2 = lines_async2.get()
print(string1)
print(string2)
这比你想要的更相似并且给出了相同的结果。
这里有一个基于并发的解决方案(真正的并发problem/solution):
from multiprocessing import Queue
from threading import Thread, Lock, Condition
def test_1(q,j,sem,other_has_writen,th):
for i in range(10):
# print("Th%i adquires conditional sem"%j)
other_has_writen.acquire()
# print("Th%i th=%i"%(j,th[0]))
if th[0]!=j:
# print("Th%i on IF"%j)
# print("Th%i adquires sem"%j)
sem.acquire()
th[0] = j
# print("Th%i releases sem"%j)
sem.release()
# print("Th%i th_after=%i"%(j,th[0]))
# print("Th%i prints -----> %i"%(j,i))
q.put('test_{}: '.format(j) + str(i),block=False)
other_has_writen.notify_all()
else:
# print("Th%i waits"%j)
other_has_writen.wait()
q.put('test_{}: '.format(j) + str(i),block=False)
other_has_writen.release()
# print("Th%i releases conditional sem"%j)
# other_has_writen.notify_all()
th = [1]
sem = Lock()
other_has_writen = Condition()
q = Queue()
p1 = Thread(target=test_1, args=(q,1,sem,other_has_writen,th))
p2 = Thread(target=test_1, args=(q,2,sem,other_has_writen,th))
p1.start()
p2.start()
# print('------- Program continues -------')
with open('test1.txt', 'w') as file:
while p1.is_alive() or p2.is_alive() or not q.empty():
try:
value = q.get(timeout = 1)
file.write(value + '\n')
except Exception as qe:
print("Empty Queue or dead process")
p1.join()
p2.join()
输出将始终交替,因为您需要另一个线程通知他已完成写入:
test_2: 0
test_1: 0
test_1: 1
test_2: 1
test_2: 2
test_1: 2
test_1: 3
test_2: 3
test_2: 4
test_1: 4
test_1: 5
test_2: 5
test_2: 6
test_1: 6
test_1: 7
test_2: 7
test_2: 8
test_1: 8
test_1: 9
test_2: 9