如何在此循环中获取 thread/subprocess 返回的值,以验证它们是否将继续处于活动状态或是否应停用?

how to get a value returned by a thread/subprocess in this loop which validates if they will continue to be active or if they should be deactivated?

#import threading
import multiprocessing
import time

condition = True
finish_state = False
x = 0
all_processes = []

def operation_process(x):
    #many operations with x value that if they take a long time will be closed because the other thread will set the boolean to True and close all open threads
    time.sleep(20) #for example these operations with x take 20 seconds in this case

def finish_process(finish_state):
    finish_state = True
    time.sleep(5) #time to send the boolean that terminates all started threads
    return finish_state

def main_process(condition, finish_state, x):

    while condition == True:
        
        #Create a new thread and fork the code()
        #THIS IS THE LINE WHERE I AM SUPPOSED TO BE EXPECTING TO RECEIVE THE RETURN VALUE, AT THE CYCLE NUMBER IN WHICH THE FUNCTION finish_state() TIME HAS FINISHED...
        #thr1 = threading.Thread(target=finish_process, args=(finish_state, ))
        thr1 = multiprocessing.Thread(target=finish_process, args=(finish_state, ))

        #thr2 = threading.Thread(target=operation_process, args=(x, ))
        thr2 = multiprocessing.Thread(target=operation_process, args=(x, ))

        x = x + 1
        print("cycle number: " + str(x))
        
        #Only in the first cycle
        if(x == 1):
            thr1.start()
            thr2.start()
            all_processes.append(thr1)
            all_processes.append(thr2)

            #print(threading.activeCount()) #to list open threads
            print(multiprocessing.active_children()) #to list open subprocesses

        if(finish_state == True):
            print("loop ends!")
            for process in all_processes:
                process.terminate()
            condition = False

main_process(condition, finish_state, x)

print("continue the code with other things after all threads/processes except the main one were closed with the loop that started them... ")

我如何接收由线程内的函数 finish_process() 编辑的值 finish_state = True return?

objective是主线程调用了函数main_process(),里面有一个模拟do-while循环的循环,在这个循环中创建了一个线程仅在该循环的第一个循环期间(即,当变量 x = 1,其想法是 do-while 循环重复直到线程returns 值finish_state = True 并且主线程关闭,因为变量finish_state 不是False.

但问题是我只需要在第一个循环中 `` 到 运行 否则每次迭代都会创建一个新线程,但同时我将不得不等待它在某个时候 return 我 finish_state = True

我决定将线程更改为子进程,因为它更容易杀死它们,因为即使线程共享变量,我也没有找到有选择地杀死它们的方法,所以使用 terminate() 方法来杀死进程。 我不确定这是否是最好的方法,因为无论如何我需要以某种方式使变量 finish_state 的信号到达进程或主线程,以便它决定是否所有线程都应该关闭,进程或线程,减去自身。


编辑:当我尝试使用 Frank Yellin 的代码时,这在控制台中给出了错误:

Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 125, in _main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
    prepare(preparation_data)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 236, in prepare
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
    main_content = runpy.run_path(main_path,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 265, in run_path
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 97, in _run_module_code
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 87, in _run_code
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
    exec(code, run_globals)
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 36, in <module>
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 36, in <module>
    main_process(finish_state)
    main_process(finish_state)
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 26, in main_process
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 26, in main_process
    thr1.start()
    thr1.start()
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\process.py", line 121, in start
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
    self._popen = self._Popen(self)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
    return Popen(process_obj)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
    _check_not_importing_main()
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

我认为你的做法完全错误。我也不太清楚为什么你有全局变量然后作为参数传递给函数,而不是只有局部变量。

无论如何,您正在寻找这样的东西。你需要把finish_state做成一个事件,一个线程可以设置,其他线程可以观察

finish_state = multiprocessing.Event()

def operation_process(x):
    # many operations with x value that if they take a long time will be closed because the other thread will set the boolean to True and close all open threads
    time.sleep(20)  # for example these operations with x take 20 seconds in this case

def finish_process(finish_state):
    time.sleep(5)
    finish_state.set()


def main_process(finish_state):
    # Create a new thread and fork the code()
    # THIS IS THE LINE WHERE I AM SUPPOSED TO BE EXPECTING TO RECEIVE THE RETURN VALUE, AT THE CYCLE NUMBER IN WHICH THE FUNCTION finish_state() TIME HAS FINISHED...
    # thr1 = threading.Thread(target=finish_process, args=(finish_state, ))
    thr1 = multiprocessing.Thread(target=finish_process, args=(finish_state,))
    thr1.start()

    thr2 = multiprocessing.Thread(target=operation_process, args=(0,))
    thr2.start()

    finish_state.wait()

    for process in [thr1, thr2]:
        process.terminate()

if __name__ == '__main__':
    main_process(finish_state)

如果您想要每秒发送一次状态报告,直到完成:

    while not finish_state.wait(timeout=1):
        # .wait() returns False if this times out:
        ... print status report ....