Pool 只执行一个线程而不是 4 个线程,我如何使其无限?

Pool only executes a single thread instead of 4, and how do I make it infinite?

所以我正在开发一个 Python 工具来对 API 应用程序进行压力测试。

我有一个使用线程的非常好的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程),以及建议这里:是使用ThreadPool,我试过如下:

def test_post():
    print "Executing in " + threading.currentThread().getName() + "\n"
    time.sleep(randint(1, 3))
    return randint(1, 5), "Message"


if args.send:
    code, content = post()
    print (code, "\n")
    print (content)
elif args.test:
    # Create new threads
    print threads
    results_list = []
    pool = ThreadPool(processes=threads)
    results = pool.apply_async(test_post())
    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.
    # results = list(pool.imap_unordered(
    #     test_post(), ()
    # ))
    # thread_list = []
    # while threading.activeCount() <= threads:
    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
    #     thread.start()
    #     thread_list.append(thread)
    print "Exiting Main Thread" + "\n"
else:
    print ("cant get here!")

当我调用脚本时,我得到一致的输出,例如:

4

Executing in MainThread

Exiting Main Thread

我不确定为什么..正如您在注释掉的块中看到的那样,我尝试了不同的方法,但它仍然只执行了一次。

我的目标是使脚本 运行 循环,总是 运行 在任何时候都运行 n 个线程。 test_post(分别为 post)函数 return HTTP 响应代码和内容 - 我想稍后将其用于 print/stop 当响应代码不是 200 OK.

你的第一个问题是你已经在 MainThread 中调用了你的函数:

pool.apply_async(test_post())

...而不是传递 test_post 作为要在工作线程中执行的调用的参数:

pool.apply_async(test_post)

OP: I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish) ...

您需要区分工作单元(作业、任务)和线程。首先使用池的全部意义在于重新使用执行程序,无论是线程还是进程。实例化 Pool 时已经创建了工作线程,只要您不关闭 Pool,所有初始线程都会保持活动状态。因此,您不关心重新创建线程,您只需调用现有池的池方法,就像您有一些要分发的工作一样频繁。 Pool 接受这个作业(一个 pool-method 调用)并从中创建任务。这些任务被放在一个无界队列中。每当 worker 完成一项任务时,它将阻塞地尝试从这样的 inqueue.

get() 一个新任务

OP: Pool only executes a single thread instead of 4...I tried different ways and it still does it only once.

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

...是单次调用、单任务生成作业。如果你想多次执行 func,你要么必须多次调用 pool.apply_async(),要么使用像

这样的映射池方法
pool.map(func, iterable, chunksize=None)

...,它将一个函数映射到一个可迭代对象上。 pool.apply_async 是非阻塞的,这就是为什么它是 "async"。它立即 return 是一个 AsyncResult 对象,您可以(阻塞地)调用 .wait().get()


Through the comments it became clear, that you want endless and immediate replacements for finished tasks (self produced input-stream)...and the program should stop on KeyboardInterrupt or when a result does not have a certain value.

您可以使用 apply_asynccallback 参数在旧任务的 any 完成后立即安排新任务。困难在于如何同时使用 MainThread 来防止整个脚本过早结束,同时保持它对 KeyboardInterrupt 的响应。让 MainThread 在循环中休眠可以让它仍然立即对 KeyboardInterrupt 做出反应,同时防止提前退出。如果结果应该停止程序,您可以让回调终止池。然后 MainThread 只需要在他的睡眠循环中包含池状态的检查。

import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool


def test_post(post_id):
    time.sleep(randint(1, 3))
    status_code = choice([200] * 9 + [404])
    return "{} {} Message no.{}: {}".format(
        datetime.now(), current_thread().name, post_id, status_code
    ), status_code


def handle_result(result):
    msg, code = result
    print(msg)
    if code != 200:
        print("terminating")
        pool.terminate()
    else:
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )


if __name__ == '__main__':

    N_WORKERS = 4

    post_cnt = count()

    pool = ThreadPool(N_WORKERS)

    # initial distribution
    for _ in range(N_WORKERS):
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )

    try:
        while pool._state == 0:  # check if pool is still alive
            time.sleep(1)
    except KeyboardInterrupt:
        print(" got interrupt")

使用键盘中断的示例输出:

$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt

由于不需要的 return 值而终止的示例输出:

$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating

请注意,在您的场景中,您还可以比 N_WORKERS 更频繁地调用 apply_async 次,以便您的初始分发有一些缓冲以减少延迟。