多线程锁定

Lock with Multi-threading

嘿,相信你们都很好,我目前正在尝试让每个 进程(线程) 读取 不同的 txt 文件或从 一个文本文件 中读取,其中每个进程分配了一定数量。

示例: 如果 txt 文件包含 20 个用户名,处理一个 消息到文本文件中指定的第一个 10 用户和 处理两个 消息到其他 10 用户文本文件。

问题:我如何读取一个文本文件中的 10 行,删除 10 行,并在创建的每个进程中读取接下来的 10 行,假设该文件有 20 行?

读取指定量

with open("test.txt", "r") as fp:
    for linenr, line in enumerate(fp):
        if linenr > 9:
            break
        elif linenr >= 0:
            print(line)

删除指定数量

with open("test.txt", 'r+') as fp:
    # read an store all lines into list
    lines = fp.readlines()
    # move file pointer to the beginning of a file
    fp.seek(0)
    # truncate the file
    fp.truncate()

    # start writing lines except the first line
    # lines[1:] from line 2 to last line
    fp.writelines(lines[10:])

代码:

import time
from selenium import webdriver
import threading
import json

def test_instance(data):
    Options = webdriver.ChromeOptions()
    mobile_emulation = {"userAgent": "Mozilla/5.0 (Linux; Android 4.2.1; en-us; Nexus 5 Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/101.0.4951.64 Mobile Safari/535.19"}
    Options.add_experimental_option("mobileEmulation", mobile_emulation)
    Options.add_argument("--log-level=3")

    bot = webdriver.Chrome(options=Options, executable_path="chromedriver.exe")
    bot.set_window_size(500, 768)
    bot.get("https://www.instagram.com/")
    
    time.sleep(10)

    # Login section==========================
    print('Logging in...')
    bot.find_element_by_xpath('//*[@id="react-root"]/section/main/article/div/div/div/div[3]/button[1]').click()
    time.sleep(5)
    username_field = bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[3]/div/label/input')
    username_field.send_keys(data['username'])
    time.sleep(5)
    password_field = bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[4]/div/label/input')
    password_field.send_keys(data['password'])
    time.sleep(5)
    bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[6]/button').click()
    time.sleep(6)
    
    bot.quit()

f = open('accounts.json',)
data = json.load(f)
f.close()
process_count = 2 # number of tests to run (each test open a separate browser)
thread_list = []

# Start test
for i in range(process_count):
    t = threading.Thread(name=f'Test {i}', target=test_instance, args=[data[i]])
    t.start()
    time.sleep(1)
    print(t.name + ' started')
    thread_list.append(t)

# Wait for all threads to complete
for thread in thread_list:
    thread.join()

print('Test completed')

尝试同时通过不同的线程读取和写入同一个文件通常是一个非常糟糕的主意,尤其是当您同时查找和截断文件时。除非您使用 Lock 来序列化访问,否则您无法确定哪个数据 read/written 的顺序是什么。

为了简单起见,我建议让主程序将输入文件读取到列表中。 给每个 thead 一份该列表中的内容以供其采取行动。


读取用户列表的示例:

# Read file into list:
with open("users.txt") as uf:
    users = [ln.strip() for ln in tf if ln[0] not in '\r\n']

让我们分解这段代码(它被称为列表理解):

for ln in tf

遍历文件中的行。

if ln[0] not in '\r\n'

这将跳过空行。

ln.strip()

这会删除例如换行符和回车符 returns.

注意 with-语句完成后,uf是一个关闭的文件,所以你再也无法阅读它了。


为要迭代的线程创建对

假设numusers(即len(users))是34。

numusers = 34

然后我们可以像这样创建一个配对列表:

im = [n for n in range(numusers+1) if n % 10 == 0 or n == numusers]

这将生成列表 [0, 10, 20, 30, 34]

现在创建对:

pairs = list(zip(im[:-1], im[1:]))

那么pairs就是[(0, 10), (10, 20), (20, 30), (30, 34)]


使用线程池执行器

然后您可以编写一个函数,将像 (0, 10) 这样的二元组作为参数,并为每个用户做一些事情。

import concurrent.futures as cf


def target(pair):
    first, last = pair
    for user in users[first, last]:
        # do the whole login thing
    # You should probably at least return a success or error code.
    return f"users {users[first]} to {users[last-1]} processed"

with cf.ThreadPoolExecutor(max_workers=2) as exec:
    results = exec.map(target, pairs)

我建议让每个线程将它想要写入的数据放在一个列表中。当所有工作线程都完成后,连接列表,然后将它们写入主线程的输出文件。

或者,您可以从每个线程写入,但您必须使用 Lock 保护文件访问,这样您就不会有多个进程同时尝试写入同一个文件。

要记住的另一件事是 chrome 不是轻量级软件! 运行 同时出现太多实例可能会使您的 PC 过载或网络连接饱和。