多处理查询文件中的新行

Multiprocessing queries new lines from a file

我正在尝试使用多处理来加速 times.The 目标是;进程将查询到文本文件中定义的域。执行时;多进程只是做同样的事情:每个进程从第一行开始查询,而不是每个进程换行。所以主要目标;每个进程从源 .txt 查询 new lines 中列出的域。 这是使用的代码:

class diginfo:
    expected_response = 101
    control_domain = 'd2f99r5bkcyeqq.cloudfront.net'
    payloads = { "Host": control_domain, "Upgrade": "websocket", "DNT":  "1", "Accept-Language": "*", "Accept": "*/*", "Accept-Encoding": "*", "Connection": "keep-alive, upgrade", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.66 Safari/537.36" }
    file_hosts = ""
    result_success = []
    num_file = 1
    columns = defaultdict(list)
    txtfiles= []
    hostpath = 'host'

def engines(counts, terminate, reach):
    for domain in domainlist:
        try:
            r = requests.get("http://" + domain, headers=headers, timeout=0.7, allow_redirects=False)
            if r.status_code == expected_response:
                print("Success" + domain)
                print(domain, file=open("RelateCFront.txt", "a"))
                result_success.append(str(domain))
            elif r.status_code != expected_response:
                print("Failed" + domain + str(r.status_code))

    print(" Loaded : "  + str(len(diginfo.result_success)))
    if len(diginfo.result_success) >= 0:
        print(" Successfull Result : ")
    for result in diginfo.result_success:
        print("  " + result)
    print("")
    while not terminate.is_set():
        reach.set()
        break

def fromtext():
        global headers, domainlist
        files = os.listdir(diginfo.hostpath)
        for f in files:
            if fnmatch.fnmatch(f, '*.txt'):
                print( str(diginfo.num_file),str(f))
                num_file=diginfo.num_file+1
                diginfo.txtfiles.append(str(f))

        fileselector = input("Choose Target Files : ")
        print("Target Chosen : " + diginfo.txtfiles[int(fileselector)-1])
        file_hosts = str(diginfo.hostpath) +"/"+ str(diginfo.txtfiles[int(fileselector)-1])

        with open(file_hosts) as f:
            parseddom = f.read().split()
            
        domainlist = list(set(parseddom))
        domainlist = list(filter(None, parseddom))

        terminate = Event()
        reach = Event()
        for counts in range(cpu_count()):
            p = Process(target=engines, args=(counts, terminate, reach))
            p.start()
        reach.wait()
        terminate.set()
        sleep(3)
        exit()

fromtext()

这是我所做的:

for domain in domainlist:
    p = Process(target=engines, args=(domainlist, terminate, reach))
    p.start()

它似乎不会响应并导致 0 个结果和无限进程。我无法传递 counts 参数,因为它只接受 3 个参数。 TerminateReach 用于在达到要求后发出信号。

Problematic Code

Problematic Screenshot

您需要将 domainlist 分成 cpu_count() 个部分,并将每个部分传递给不同的进程。 您还错误地使用了事件:目前,它会在任何进程完成后 3 秒退出,而不管其他进程是否仍在工作。 您应该改用 Barrier,或者在 fromtext():

中的每个进程上调用 join()
def engines(domainsublist):
    for domain in domainsublist:
        ...

def fromtext():
    ...
    num_cpus = cpu_count()
    processes = []
    for process_num in range(num_cpus):
        section = domainlist[process_num::num_cpus]
        p = Process(target=engines, args=(section,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

最后,您在 engines() 中遇到了一些竞争条件:当您写入 RelateCFront.txt 和追加到 diginfo.result_success 时。 SO 上有很多好的解决方案;我不会在这里尝试修复它们。