我怎样才能在 concurrent.futures.ThreadPoolExecutor.map 函数中执行其他函数?

how can i time executing other function in concurrent.futures.ThreadPoolExecutor.map function?

我有一个程序,我目前正在同时使用 concurrent.futures.ThreadPoolExecutor 到 运行 多个任务。这些任务通常 I/O 绑定,调用读取和打印大型文本文件的输出,我需要每分钟中断 ThreadPoolExecutor 并执行另一个函数 我尝试使用 result 方法但它没有用

def check_connection():
    try:
        request= requests.get("http://google.com",timeout=10)
        print("Connected to the internet")
        pass
    except(requests.ConnectionError, requests.Timeout) as exception:
        print("No internet connection waiting for connection")
        try:
            request= requests.get("http://google.com",timeout=10)
            print("Connected to the internet")
            pass
        except(requests.ConnectionError, requests.Timeout) as exception:
            input("please fix the internet issue to continue")
            
def printt(x):
    print(x)

def main():
    with open(passwd_wdlst, "r") as file:
        passwords = file.read().splitlines()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        try:
            executor.map(executor.map(printt,passwords)).result(timeout=60)
        except concurrent.futures.TimeoutError:
            print("cheking the internet connection")
            check_connection()
            continue
main()

我怎样才能让它工作或者我可以 运行 计时器中的线程池来做到这一点?

我猜你想要什么。该程序使用包 multiprocessing.pool 中的 class ThreadPool,因为正如我评论的那样,您可以通过终止池来终止处理时间过长的挂起线程。该程序使用方法 apply_async 提交单个任务,其中 return 是一个 AsyncResult,其 get 方法可以使用 timeout 参数调用从工作函数 printtTimeOut 异常中获取 return 值。对于每次成功完成,密码都会从原始密码集中删除,因此剩下的只是那些发生超时的密码。如果有任何这样的密码,池将终止,调用 check_connection 并且我们只针对上一次迭代失败的密码循环重新提交任务。

from multiprocessing.pool import ThreadPool, TimeoutError
import requests
import time

def check_connection():
    try:
        request = requests.get("http://google.com",timeout=10)
        print("Connected to the internet")
    except(requests.ConnectionError, requests.Timeout) as exception:
        print("No internet connection waiting for connection")
        try:
            request = requests.get("http://google.com",timeout=10)
            print("Connected to the internet")
        except(requests.ConnectionError, requests.Timeout) as exception:
            input("please fix the internet issue to continue")


def printt(x):
    print(x)


TIMEOUT = 60

def main():
    with open(passwd_wdlst, "r") as file:
        passwords = file.read().splitlines()
    MAX_THREADS = 30 # some suitable number
    passwords = set(passwords) # convert to a set
    while passwords: # as long as there are passwords left to processs
        n_threads = min(len(passwords), MAX_THREADS)
        pool = ThreadPool(n_threads)
        # create all the tasks
        results = {pool.apply_async(printt, args=(password,)): password for password in passwords}
        t_end = time.time() + TIMEOUT # everything needs to be done by this time
        for result, password in results.items():
            # wait for completion
            try:
                time_to_wait = max(0, t_end - time.time())
                return_value = result.get(timeout=time_to_wait) # return value from printt, which is None for now
            except TimeoutError:
                pass
            else:
                passwords.remove(password) # successfully completed
        # do we still have any uncompleted passwords?
        if passwords:
            pool.terminate()
            check_connection()

main()

谢谢 Boobo 的回答我还提出了另一个解决方案,它在循环中使用提交方法而不是映射函数,这是代码

import concurrent
import time 

def check_connection():
try:
    request= requests.get("http://google.com",timeout=10)
    print("Connected to the internet")
    pass
except(requests.ConnectionError, requests.Timeout) as exception:
    print("No internet connection waiting for connection")
    try:
        request= requests.get("http://google.com",timeout=10)
        print("Connected to the internet")
        pass
    except(requests.ConnectionError, requests.Timeout) as exception:
        input("please fix the internet issue to continue")

        
def printt(x): # print output
    print(f'this is number {x}')
    
def counter(x):  
    if time.time() > x:    # if true check connection      
        input('3[0;37;40mChecking the connection please wait  ...')
        check_connection()
        global t_end
        t_end = time.time() + 10

with open("wordlist.lst", "r") as file:
    passwds_wdlst = file.read().splitlines()

global t_end  # making the variable global
t_end = time.time() + 5  # counter to count here is 5 seconds

for x in range(100000,0,-1): 
    with concurrent.futures.ThreadPoolExecutor() as executor:
        counter(t_end) #checking if the countdown is finished 
        executor.submit(printt(x))

此外,另一个更好的解决方案是继承 map 方法并将计时器嵌入其中 像这样

import concurrent
import time 

class Myconcurrent(concurrent.futures.ThreadPoolExecutor):
    def map(self, fn , *iterables, timeout=None, chunksize=1):
        counter() #checking if the countdown is finished 
#         print("successfully subclassed")
        super().map(fn, *iterables, timeout=None, chunksize=1)
    
def printt(y , x):
    print(x)
    print(f'tried {y} for username {x}')
    
def counter():
#     print(global_count)
    global t_end
    if time.time() > t_end :            
        input('3[0;37;40mchanging the ip please wait ...')
#         time.sleep(3)
        t_end = time.time() + global_count
        
def main(count):
    with open("users.lst", "r") as file:
        usrs_wdlst = file.read().splitlines()

    with open("wordlist.lst", "r") as file:
        passwds_wdlst = file.read().splitlines()

    global global_count
    global_count = count

    global t_end
    t_end = time.time() + count 
    
    for user in usrs_wdlst:
        for x in range(100000,0,-1):
            with Myconcurrent() as executor:
#                 counter(t_end) #checking if the countdown is finished 
                executor.map(printt , [str(x)] , [str(user)] )
count=6   
main(count)