Python threading: 让主线程报告进度
Python threading: make the main thread report the progress
我运行一些并行的作业,有时会花费很长时间,所以我希望主线程报告进度。例如,每小时。
以下是我想出的简化版本。该代码将 运行 test_function
在 2 个线程中,参数来自 input_arguments
。每 5 秒它会打印完成的作业的百分比。
import threading
import queue
import time
def test_function(x):
time.sleep(4)
print("Finished ", x)
num_processes = 2
input_arguments = range(10)
# Define a worker which will continuously execute function taking input parameters from the queue
def worker():
while True:
x = q.get()
if x is None:
break
test_function(x)
q.task_done()
# Initialize queue and the threads
q = queue.Queue()
threads = []
for i in range(num_processes):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# Create a queue of input parameters for function
for item in input_arguments:
q.put(item)
# Report progress every 5 seconds
report_progress(q)
# stop workers
for i in range(num_processes):
q.put(None)
for t in threads:
t.join()
其中report_progress
定义如下
def report_progress(q):
qsize_init = q.qsize()
while not q.empty():
time.sleep(5)
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
但是,我想每小时报告一次进度,而不是 5 秒,如果所有作业都完成,程序可能会空闲很多分钟。
另一种可能是以不同方式定义report_progress
:
def report_progress(q):
qsize_init = q.qsize()
time_start = time.time()
while not q.empty():
current_time = time.time()
if current_time - time_start > 5:
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
time_start = time.time()
我担心不断检查这种情况会耗尽 CPU 资源,虽然只是一小部分,但按小时计算可能会很多。
是否有处理此问题的标准方法?
Python: 3.6
现在我将使用@Andriy Maletsky 在评论中建议的简单解决方案。
主线程每隔几秒检查一次q是否不为空,如果距离上次报告已经超过1小时,它会打印进度信息。
time_between_reports = 3600
time_between_checks = 5
def report_progress_until_finished(q):
qsize_init = q.qsize()
last_report_time = time.time()
while not q.empty():
time_elapsed = time.time() - last_report_time
if time_elapsed > time_between_reports:
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
last_report_time = time.time()
time.sleep(time_between_checks)
我运行一些并行的作业,有时会花费很长时间,所以我希望主线程报告进度。例如,每小时。
以下是我想出的简化版本。该代码将 运行 test_function
在 2 个线程中,参数来自 input_arguments
。每 5 秒它会打印完成的作业的百分比。
import threading
import queue
import time
def test_function(x):
time.sleep(4)
print("Finished ", x)
num_processes = 2
input_arguments = range(10)
# Define a worker which will continuously execute function taking input parameters from the queue
def worker():
while True:
x = q.get()
if x is None:
break
test_function(x)
q.task_done()
# Initialize queue and the threads
q = queue.Queue()
threads = []
for i in range(num_processes):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# Create a queue of input parameters for function
for item in input_arguments:
q.put(item)
# Report progress every 5 seconds
report_progress(q)
# stop workers
for i in range(num_processes):
q.put(None)
for t in threads:
t.join()
其中report_progress
定义如下
def report_progress(q):
qsize_init = q.qsize()
while not q.empty():
time.sleep(5)
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
但是,我想每小时报告一次进度,而不是 5 秒,如果所有作业都完成,程序可能会空闲很多分钟。
另一种可能是以不同方式定义report_progress
:
def report_progress(q):
qsize_init = q.qsize()
time_start = time.time()
while not q.empty():
current_time = time.time()
if current_time - time_start > 5:
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
time_start = time.time()
我担心不断检查这种情况会耗尽 CPU 资源,虽然只是一小部分,但按小时计算可能会很多。
是否有处理此问题的标准方法?
Python: 3.6
现在我将使用@Andriy Maletsky 在评论中建议的简单解决方案。
主线程每隔几秒检查一次q是否不为空,如果距离上次报告已经超过1小时,它会打印进度信息。
time_between_reports = 3600
time_between_checks = 5
def report_progress_until_finished(q):
qsize_init = q.qsize()
last_report_time = time.time()
while not q.empty():
time_elapsed = time.time() - last_report_time
if time_elapsed > time_between_reports:
portion_finished = 1 - q.qsize() / qsize_init
print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
last_report_time = time.time()
time.sleep(time_between_checks)