ThreadPoolExecutor 中具有递增长度值的内存安全排列生成器

Memory Safe Permutations Generator with Increasing Length Value in ThreadPoolExecutor

感谢@rici 在评论中引导我在这方面朝着正确的方向前进。我发现 concurrent.futures.map()concurrent.futures.execut() 使用可迭代对象的立即处理,而 Python 的默认 map() 函数可以懒惰地遍历可迭代对象,这在处理大 productpermutation spaces。 concurrent.futures 路由在下面的示例代码中达到 2 个或更多的组合时会耗尽所有 RAM。

我现在要做的是使用多线程实现我在下面更新的代码中的内容。我想要做的是多线程 Python 的默认 map() 函数,从一个通用的 product 生成器中提取可迭代对象。我已经注释掉了“有效的”多线程示例以供参考并展示我试图完成的工作。

我偶然发现了 main_lazy 函数的一个潜在修复,但是我对如何使用我的 returns 2 值的代码函数实现它感到困惑?地图、拉链和 lambda 在这里让我感到困惑,我不确定块的东西如何与我正在使用的 space 一起工作,但也许它对其他人有意义。

目前,这是我正在尝试多线程处理的内存安全代码的单线程版本。

请注意,我不关心这会生成多少组合背后的数学原理,因为它与我的用例无关,只要它能降低内存使用量即可。这是更新后的代码。

重现:

  1. 下载VAmPI并启动服务器
  2. 更新下面代码中的 BASE_URL 以匹配您的服务器
  3. 运行 这个代码
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib


# ---------------------------------------------------------------------------- #
#                                   Variables                                  #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the 
                                       # server on your machine
if BASE_URL[-1] != "/":
    BASE_URL = BASE_URL + "/"


# ---------------------------------------------------------------------------- #
#                   Retrieve list of endpoints to product'ize                  #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
    decoded_line = line.decode("utf-8").replace("\n","")
    list_of_individual_api_endpoints.append(decoded_line)


# ---------------------------------------------------------------------------- #
#                 The multithreaded function we're going to use                #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
    # Deconstruct a proper endpoint from the passed in tuple
    new_endpoint = ""
    for x in endpoint:
        new_endpoint += str(x) + "/"
    new_endpoint = new_endpoint[:-1]
    # Ping the endpoint to get a response code
    response = requests.get(BASE_URL + str(new_endpoint))
    status_code = response.status_code
    return status_code, new_endpoint


# # ---------------------------------------------------------------------------- #
# #                                 Main Function                                #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
#     results_dict = {'endpoint':[], 'status_code': []}
#     # Start the threadpool
#     with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
#         # Try/Except for a keyboard interrupt. If this is not the correct implementation
#         # to stop a multithreaded pool, please demonstrate the correct way
#         try:
#             # Iterate from 1 to 3
#             for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
#                 print("Checking endpoints with depth of", i)
#                 # Can change this from .map to .execute, tried them both
#                 future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
#                 status_code = future.result()[0]
#                 endpoint = future.result()[1]
#                 if str(status_code) != "404":
#                     results_dict['endpoint'].append(endpoint)
#                     results_dict['status_code'].append(status_code)
#                     print("Endpoint:", endpoint, ", Status Code:", status_code)
#         except KeyboardInterrupt:
#             print("Early stopping engaged...")
#             pass
#     # Print the results dict
#     print(results_dict)


# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
    results_dict = {'endpoint':[], 'status_code': []}
    for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
        print("Checking endpoints with depth of", i)
        results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
        for status_code, endpoint in results:
            # print(status_code, endpoint)
            if str(status_code) != "404":
                results_dict['endpoint'].append(endpoint)
                results_dict['status_code'].append(status_code)
                print("Endpoint:", endpoint, ", Status Code:", status_code)

# ---------------------------------------------------------------------------- #
#                                 Start Program                                #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
    # main()
    main_lazy()
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed


words = ["I", "like", "to", "take", "my", "dogs", "for", "a",
         "walk", "every", "day", "after", "work"]


def gen():
    for i in product(words, repeat=3):
        yield i


def worker(rec_str):
    return rec_str


def main():
    with ThreadPoolExecutor() as executor:
        fs = (executor.submit(worker, i) for i in gen())
        for i in as_completed(fs):
            print(i.result())


if __name__ == "__main__":
    main()

我找到了解决办法。在从 github 获取端点列表的代码部分之后,我使用以下内容:

# ---------------------------------------------------------------------------- #
#                        Function to Ping API Endpoints                        #
# ---------------------------------------------------------------------------- #
# Create Thread Safe Class for Generator and Worker Function
results_dict = {"endpoint": [], "status_code": []}
class LockedIterator(object):
    def __init__(self, iterator):
        self.lock = threading.Lock()
        self.iterator = iter(iterator)
    def __iter__(self): return self
    def __next__(self):
        with self.lock:
            return self.iterator.__next__()
            
def generator_function(repeat):
    for x in product(list_endpoint_words, repeat=repeat):
        yield x

def worker_function(current_gen_value):
    for endpoint in current_gen_value:
        # time.sleep(randint(0,2))
        if len(endpoint) > 1:
            for x in endpoint:
                new_endpoint = x + "/"
            new_endpoint = new_endpoint[:-1]
        else:
            new_endpoint = endpoint[0]
        response = requests.get(BASE_URL + str(new_endpoint))
        status_code = response.status_code
        if str(status_code) != "404":
            results_dict['endpoint'].append(endpoint)
            results_dict['status_code'].append(status_code)
            print("Endpoint:", endpoint, ", Status Code:", status_code)


# ---------------------------------------------------------------------------- #
#                              Main Program Start                              #
# ---------------------------------------------------------------------------- #
start_time = time.time()

for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):

    thread_safe_generator = LockedIterator(generator_function(repeat))

    threads_list = []
    for _ in range(MAX_WORKERS):
        thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
        # thread.daemon = True
        threads_list.append(thread)
    for thread in threads_list:
        thread.start()
    for thread in threads_list:
        thread.join()
    
results_df = DataFrame.from_dict(results_dict)
results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
results_df.to_csv("endpoint_results.csv", index=False)
print(results_df)
print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )

这为工作人员创建了一个线程和内存安全的生成器和多个线程。现在唯一缺少的是如何使 CTRL + C 与此一起工作,但无论如何。