ThreadPoolExecutor 中具有递增长度值的内存安全排列生成器
Memory Safe Permutations Generator with Increasing Length Value in ThreadPoolExecutor
感谢@rici 在评论中引导我在这方面朝着正确的方向前进。我发现 concurrent.futures.map()
和 concurrent.futures.execut()
使用可迭代对象的立即处理,而 Python 的默认 map()
函数可以懒惰地遍历可迭代对象,这在处理大 product
和 permutation
spaces。 concurrent.futures
路由在下面的示例代码中达到 2 个或更多的组合时会耗尽所有 RAM。
我现在要做的是使用多线程实现我在下面更新的代码中的内容。我想要做的是多线程 Python 的默认 map()
函数,从一个通用的 product
生成器中提取可迭代对象。我已经注释掉了“有效的”多线程示例以供参考并展示我试图完成的工作。
我偶然发现了 中 main_lazy
函数的一个潜在修复,但是我对如何使用我的 returns 2 值的代码函数实现它感到困惑?地图、拉链和 lambda 在这里让我感到困惑,我不确定块的东西如何与我正在使用的 space 一起工作,但也许它对其他人有意义。
目前,这是我正在尝试多线程处理的内存安全代码的单线程版本。
请注意,我不关心这会生成多少组合背后的数学原理,因为它与我的用例无关,只要它能降低内存使用量即可。这是更新后的代码。
重现:
- 下载VAmPI并启动服务器
- 更新下面代码中的
BASE_URL
以匹配您的服务器
- 运行 这个代码
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib
# ---------------------------------------------------------------------------- #
# Variables #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the
# server on your machine
if BASE_URL[-1] != "/":
BASE_URL = BASE_URL + "/"
# ---------------------------------------------------------------------------- #
# Retrieve list of endpoints to product'ize #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
decoded_line = line.decode("utf-8").replace("\n","")
list_of_individual_api_endpoints.append(decoded_line)
# ---------------------------------------------------------------------------- #
# The multithreaded function we're going to use #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
# Deconstruct a proper endpoint from the passed in tuple
new_endpoint = ""
for x in endpoint:
new_endpoint += str(x) + "/"
new_endpoint = new_endpoint[:-1]
# Ping the endpoint to get a response code
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
return status_code, new_endpoint
# # ---------------------------------------------------------------------------- #
# # Main Function #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
# results_dict = {'endpoint':[], 'status_code': []}
# # Start the threadpool
# with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# # Try/Except for a keyboard interrupt. If this is not the correct implementation
# # to stop a multithreaded pool, please demonstrate the correct way
# try:
# # Iterate from 1 to 3
# for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
# print("Checking endpoints with depth of", i)
# # Can change this from .map to .execute, tried them both
# future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
# status_code = future.result()[0]
# endpoint = future.result()[1]
# if str(status_code) != "404":
# results_dict['endpoint'].append(endpoint)
# results_dict['status_code'].append(status_code)
# print("Endpoint:", endpoint, ", Status Code:", status_code)
# except KeyboardInterrupt:
# print("Early stopping engaged...")
# pass
# # Print the results dict
# print(results_dict)
# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
results_dict = {'endpoint':[], 'status_code': []}
for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
print("Checking endpoints with depth of", i)
results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
for status_code, endpoint in results:
# print(status_code, endpoint)
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Start Program #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
# main()
main_lazy()
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed
words = ["I", "like", "to", "take", "my", "dogs", "for", "a",
"walk", "every", "day", "after", "work"]
def gen():
for i in product(words, repeat=3):
yield i
def worker(rec_str):
return rec_str
def main():
with ThreadPoolExecutor() as executor:
fs = (executor.submit(worker, i) for i in gen())
for i in as_completed(fs):
print(i.result())
if __name__ == "__main__":
main()
我找到了解决办法。在从 github 获取端点列表的代码部分之后,我使用以下内容:
# ---------------------------------------------------------------------------- #
# Function to Ping API Endpoints #
# ---------------------------------------------------------------------------- #
# Create Thread Safe Class for Generator and Worker Function
results_dict = {"endpoint": [], "status_code": []}
class LockedIterator(object):
def __init__(self, iterator):
self.lock = threading.Lock()
self.iterator = iter(iterator)
def __iter__(self): return self
def __next__(self):
with self.lock:
return self.iterator.__next__()
def generator_function(repeat):
for x in product(list_endpoint_words, repeat=repeat):
yield x
def worker_function(current_gen_value):
for endpoint in current_gen_value:
# time.sleep(randint(0,2))
if len(endpoint) > 1:
for x in endpoint:
new_endpoint = x + "/"
new_endpoint = new_endpoint[:-1]
else:
new_endpoint = endpoint[0]
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Main Program Start #
# ---------------------------------------------------------------------------- #
start_time = time.time()
for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
thread_safe_generator = LockedIterator(generator_function(repeat))
threads_list = []
for _ in range(MAX_WORKERS):
thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
# thread.daemon = True
threads_list.append(thread)
for thread in threads_list:
thread.start()
for thread in threads_list:
thread.join()
results_df = DataFrame.from_dict(results_dict)
results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
results_df.to_csv("endpoint_results.csv", index=False)
print(results_df)
print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )
这为工作人员创建了一个线程和内存安全的生成器和多个线程。现在唯一缺少的是如何使 CTRL + C 与此一起工作,但无论如何。
感谢@rici 在评论中引导我在这方面朝着正确的方向前进。我发现 concurrent.futures.map()
和 concurrent.futures.execut()
使用可迭代对象的立即处理,而 Python 的默认 map()
函数可以懒惰地遍历可迭代对象,这在处理大 product
和 permutation
spaces。 concurrent.futures
路由在下面的示例代码中达到 2 个或更多的组合时会耗尽所有 RAM。
我现在要做的是使用多线程实现我在下面更新的代码中的内容。我想要做的是多线程 Python 的默认 map()
函数,从一个通用的 product
生成器中提取可迭代对象。我已经注释掉了“有效的”多线程示例以供参考并展示我试图完成的工作。
我偶然发现了 main_lazy
函数的一个潜在修复,但是我对如何使用我的 returns 2 值的代码函数实现它感到困惑?地图、拉链和 lambda 在这里让我感到困惑,我不确定块的东西如何与我正在使用的 space 一起工作,但也许它对其他人有意义。
目前,这是我正在尝试多线程处理的内存安全代码的单线程版本。
请注意,我不关心这会生成多少组合背后的数学原理,因为它与我的用例无关,只要它能降低内存使用量即可。这是更新后的代码。
重现:
- 下载VAmPI并启动服务器
- 更新下面代码中的
BASE_URL
以匹配您的服务器 - 运行 这个代码
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib
# ---------------------------------------------------------------------------- #
# Variables #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the
# server on your machine
if BASE_URL[-1] != "/":
BASE_URL = BASE_URL + "/"
# ---------------------------------------------------------------------------- #
# Retrieve list of endpoints to product'ize #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
decoded_line = line.decode("utf-8").replace("\n","")
list_of_individual_api_endpoints.append(decoded_line)
# ---------------------------------------------------------------------------- #
# The multithreaded function we're going to use #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
# Deconstruct a proper endpoint from the passed in tuple
new_endpoint = ""
for x in endpoint:
new_endpoint += str(x) + "/"
new_endpoint = new_endpoint[:-1]
# Ping the endpoint to get a response code
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
return status_code, new_endpoint
# # ---------------------------------------------------------------------------- #
# # Main Function #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
# results_dict = {'endpoint':[], 'status_code': []}
# # Start the threadpool
# with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# # Try/Except for a keyboard interrupt. If this is not the correct implementation
# # to stop a multithreaded pool, please demonstrate the correct way
# try:
# # Iterate from 1 to 3
# for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
# print("Checking endpoints with depth of", i)
# # Can change this from .map to .execute, tried them both
# future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
# status_code = future.result()[0]
# endpoint = future.result()[1]
# if str(status_code) != "404":
# results_dict['endpoint'].append(endpoint)
# results_dict['status_code'].append(status_code)
# print("Endpoint:", endpoint, ", Status Code:", status_code)
# except KeyboardInterrupt:
# print("Early stopping engaged...")
# pass
# # Print the results dict
# print(results_dict)
# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
results_dict = {'endpoint':[], 'status_code': []}
for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
print("Checking endpoints with depth of", i)
results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
for status_code, endpoint in results:
# print(status_code, endpoint)
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Start Program #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
# main()
main_lazy()
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed
words = ["I", "like", "to", "take", "my", "dogs", "for", "a",
"walk", "every", "day", "after", "work"]
def gen():
for i in product(words, repeat=3):
yield i
def worker(rec_str):
return rec_str
def main():
with ThreadPoolExecutor() as executor:
fs = (executor.submit(worker, i) for i in gen())
for i in as_completed(fs):
print(i.result())
if __name__ == "__main__":
main()
我找到了解决办法。在从 github 获取端点列表的代码部分之后,我使用以下内容:
# ---------------------------------------------------------------------------- #
# Function to Ping API Endpoints #
# ---------------------------------------------------------------------------- #
# Create Thread Safe Class for Generator and Worker Function
results_dict = {"endpoint": [], "status_code": []}
class LockedIterator(object):
def __init__(self, iterator):
self.lock = threading.Lock()
self.iterator = iter(iterator)
def __iter__(self): return self
def __next__(self):
with self.lock:
return self.iterator.__next__()
def generator_function(repeat):
for x in product(list_endpoint_words, repeat=repeat):
yield x
def worker_function(current_gen_value):
for endpoint in current_gen_value:
# time.sleep(randint(0,2))
if len(endpoint) > 1:
for x in endpoint:
new_endpoint = x + "/"
new_endpoint = new_endpoint[:-1]
else:
new_endpoint = endpoint[0]
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Main Program Start #
# ---------------------------------------------------------------------------- #
start_time = time.time()
for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
thread_safe_generator = LockedIterator(generator_function(repeat))
threads_list = []
for _ in range(MAX_WORKERS):
thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
# thread.daemon = True
threads_list.append(thread)
for thread in threads_list:
thread.start()
for thread in threads_list:
thread.join()
results_df = DataFrame.from_dict(results_dict)
results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
results_df.to_csv("endpoint_results.csv", index=False)
print(results_df)
print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )
这为工作人员创建了一个线程和内存安全的生成器和多个线程。现在唯一缺少的是如何使 CTRL + C 与此一起工作,但无论如何。