我怎样才能在 concurrent.futures.ThreadPoolExecutor.map 函数中执行其他函数?
how can i time executing other function in concurrent.futures.ThreadPoolExecutor.map function?
我有一个程序,我目前正在同时使用 concurrent.futures.ThreadPoolExecutor 到 运行 多个任务。这些任务通常 I/O 绑定,调用读取和打印大型文本文件的输出,我需要每分钟中断 ThreadPoolExecutor 并执行另一个函数
我尝试使用 result 方法但它没有用
def check_connection():
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x):
print(x)
def main():
with open(passwd_wdlst, "r") as file:
passwords = file.read().splitlines()
with concurrent.futures.ThreadPoolExecutor() as executor:
try:
executor.map(executor.map(printt,passwords)).result(timeout=60)
except concurrent.futures.TimeoutError:
print("cheking the internet connection")
check_connection()
continue
main()
我怎样才能让它工作或者我可以 运行 计时器中的线程池来做到这一点?
我猜你想要什么。该程序使用包 multiprocessing.pool
中的 class ThreadPool
,因为正如我评论的那样,您可以通过终止池来终止处理时间过长的挂起线程。该程序使用方法 apply_async
提交单个任务,其中 return 是一个 AsyncResult
,其 get
方法可以使用 timeout 参数调用从工作函数 printt
或 TimeOut
异常中获取 return 值。对于每次成功完成,密码都会从原始密码集中删除,因此剩下的只是那些发生超时的密码。如果有任何这样的密码,池将终止,调用 check_connection
并且我们只针对上一次迭代失败的密码循环重新提交任务。
from multiprocessing.pool import ThreadPool, TimeoutError
import requests
import time
def check_connection():
try:
request = requests.get("http://google.com",timeout=10)
print("Connected to the internet")
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request = requests.get("http://google.com",timeout=10)
print("Connected to the internet")
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x):
print(x)
TIMEOUT = 60
def main():
with open(passwd_wdlst, "r") as file:
passwords = file.read().splitlines()
MAX_THREADS = 30 # some suitable number
passwords = set(passwords) # convert to a set
while passwords: # as long as there are passwords left to processs
n_threads = min(len(passwords), MAX_THREADS)
pool = ThreadPool(n_threads)
# create all the tasks
results = {pool.apply_async(printt, args=(password,)): password for password in passwords}
t_end = time.time() + TIMEOUT # everything needs to be done by this time
for result, password in results.items():
# wait for completion
try:
time_to_wait = max(0, t_end - time.time())
return_value = result.get(timeout=time_to_wait) # return value from printt, which is None for now
except TimeoutError:
pass
else:
passwords.remove(password) # successfully completed
# do we still have any uncompleted passwords?
if passwords:
pool.terminate()
check_connection()
main()
谢谢 Boobo 的回答我还提出了另一个解决方案,它在循环中使用提交方法而不是映射函数,这是代码
import concurrent
import time
def check_connection():
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x): # print output
print(f'this is number {x}')
def counter(x):
if time.time() > x: # if true check connection
input('3[0;37;40mChecking the connection please wait ...')
check_connection()
global t_end
t_end = time.time() + 10
with open("wordlist.lst", "r") as file:
passwds_wdlst = file.read().splitlines()
global t_end # making the variable global
t_end = time.time() + 5 # counter to count here is 5 seconds
for x in range(100000,0,-1):
with concurrent.futures.ThreadPoolExecutor() as executor:
counter(t_end) #checking if the countdown is finished
executor.submit(printt(x))
此外,另一个更好的解决方案是继承 map 方法并将计时器嵌入其中
像这样
import concurrent
import time
class Myconcurrent(concurrent.futures.ThreadPoolExecutor):
def map(self, fn , *iterables, timeout=None, chunksize=1):
counter() #checking if the countdown is finished
# print("successfully subclassed")
super().map(fn, *iterables, timeout=None, chunksize=1)
def printt(y , x):
print(x)
print(f'tried {y} for username {x}')
def counter():
# print(global_count)
global t_end
if time.time() > t_end :
input('3[0;37;40mchanging the ip please wait ...')
# time.sleep(3)
t_end = time.time() + global_count
def main(count):
with open("users.lst", "r") as file:
usrs_wdlst = file.read().splitlines()
with open("wordlist.lst", "r") as file:
passwds_wdlst = file.read().splitlines()
global global_count
global_count = count
global t_end
t_end = time.time() + count
for user in usrs_wdlst:
for x in range(100000,0,-1):
with Myconcurrent() as executor:
# counter(t_end) #checking if the countdown is finished
executor.map(printt , [str(x)] , [str(user)] )
count=6
main(count)
我有一个程序,我目前正在同时使用 concurrent.futures.ThreadPoolExecutor 到 运行 多个任务。这些任务通常 I/O 绑定,调用读取和打印大型文本文件的输出,我需要每分钟中断 ThreadPoolExecutor 并执行另一个函数 我尝试使用 result 方法但它没有用
def check_connection():
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x):
print(x)
def main():
with open(passwd_wdlst, "r") as file:
passwords = file.read().splitlines()
with concurrent.futures.ThreadPoolExecutor() as executor:
try:
executor.map(executor.map(printt,passwords)).result(timeout=60)
except concurrent.futures.TimeoutError:
print("cheking the internet connection")
check_connection()
continue
main()
我怎样才能让它工作或者我可以 运行 计时器中的线程池来做到这一点?
我猜你想要什么。该程序使用包 multiprocessing.pool
中的 class ThreadPool
,因为正如我评论的那样,您可以通过终止池来终止处理时间过长的挂起线程。该程序使用方法 apply_async
提交单个任务,其中 return 是一个 AsyncResult
,其 get
方法可以使用 timeout 参数调用从工作函数 printt
或 TimeOut
异常中获取 return 值。对于每次成功完成,密码都会从原始密码集中删除,因此剩下的只是那些发生超时的密码。如果有任何这样的密码,池将终止,调用 check_connection
并且我们只针对上一次迭代失败的密码循环重新提交任务。
from multiprocessing.pool import ThreadPool, TimeoutError
import requests
import time
def check_connection():
try:
request = requests.get("http://google.com",timeout=10)
print("Connected to the internet")
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request = requests.get("http://google.com",timeout=10)
print("Connected to the internet")
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x):
print(x)
TIMEOUT = 60
def main():
with open(passwd_wdlst, "r") as file:
passwords = file.read().splitlines()
MAX_THREADS = 30 # some suitable number
passwords = set(passwords) # convert to a set
while passwords: # as long as there are passwords left to processs
n_threads = min(len(passwords), MAX_THREADS)
pool = ThreadPool(n_threads)
# create all the tasks
results = {pool.apply_async(printt, args=(password,)): password for password in passwords}
t_end = time.time() + TIMEOUT # everything needs to be done by this time
for result, password in results.items():
# wait for completion
try:
time_to_wait = max(0, t_end - time.time())
return_value = result.get(timeout=time_to_wait) # return value from printt, which is None for now
except TimeoutError:
pass
else:
passwords.remove(password) # successfully completed
# do we still have any uncompleted passwords?
if passwords:
pool.terminate()
check_connection()
main()
谢谢 Boobo 的回答我还提出了另一个解决方案,它在循环中使用提交方法而不是映射函数,这是代码
import concurrent
import time
def check_connection():
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
print("No internet connection waiting for connection")
try:
request= requests.get("http://google.com",timeout=10)
print("Connected to the internet")
pass
except(requests.ConnectionError, requests.Timeout) as exception:
input("please fix the internet issue to continue")
def printt(x): # print output
print(f'this is number {x}')
def counter(x):
if time.time() > x: # if true check connection
input('3[0;37;40mChecking the connection please wait ...')
check_connection()
global t_end
t_end = time.time() + 10
with open("wordlist.lst", "r") as file:
passwds_wdlst = file.read().splitlines()
global t_end # making the variable global
t_end = time.time() + 5 # counter to count here is 5 seconds
for x in range(100000,0,-1):
with concurrent.futures.ThreadPoolExecutor() as executor:
counter(t_end) #checking if the countdown is finished
executor.submit(printt(x))
此外,另一个更好的解决方案是继承 map 方法并将计时器嵌入其中 像这样
import concurrent
import time
class Myconcurrent(concurrent.futures.ThreadPoolExecutor):
def map(self, fn , *iterables, timeout=None, chunksize=1):
counter() #checking if the countdown is finished
# print("successfully subclassed")
super().map(fn, *iterables, timeout=None, chunksize=1)
def printt(y , x):
print(x)
print(f'tried {y} for username {x}')
def counter():
# print(global_count)
global t_end
if time.time() > t_end :
input('3[0;37;40mchanging the ip please wait ...')
# time.sleep(3)
t_end = time.time() + global_count
def main(count):
with open("users.lst", "r") as file:
usrs_wdlst = file.read().splitlines()
with open("wordlist.lst", "r") as file:
passwds_wdlst = file.read().splitlines()
global global_count
global_count = count
global t_end
t_end = time.time() + count
for user in usrs_wdlst:
for x in range(100000,0,-1):
with Myconcurrent() as executor:
# counter(t_end) #checking if the countdown is finished
executor.map(printt , [str(x)] , [str(user)] )
count=6
main(count)