concurrent.futures 以 2 个列表作为变量的多线程

concurrent.futures multithreading with 2 lists as variables

所以我想用并发 futures 对以下工作代码进行多线程处理,但到目前为止我尝试过的任何东西似乎都不起作用。

def download(song_filename_list, song_link_list):

    with requests.Session() as s:
    
        login_request = s.post(login_url, data= payload, headers= headers)

        for x in range(len(song_filename_list)):

            download_request = s.get(song_link_list[x], headers= download_headers, stream=True)

            if download_request.status_code == 200:
                print(f"Downloading {x+1} out of {len(song_filename_list)}!\n")
                pass
            else:
                print(f"\nStatus Code: {download_request.status_code}!\n")
                sys.exit()

            
            with open (song_filename_list[x], "wb") as file:
                file.write(download_request.content)

2 个主要变量是 song_filename_listsong_link_list

第一个列表包含每个文件的名称,第二个列表包含所有各自的下载 links。
所以每个文件的名称和 link 位于相同的位置。
例如:name_of_file1 = song_filename_list[0]link_of_file1 = song_link_list[0]


这是多线程的最新尝试:

def download(song_filename_list, song_link_list):

    with requests.Session() as s:
    
        login_request = s.post(login_url, data= payload, headers= headers)

        x = []
        for i in range(len(song_filename_list)):
            x.append(i)


        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.submit(get_file, x)


def get_file(x):
    
    download_request = s.get(song_link_list[x], headers= download_headers, stream=True)

    if download_request.status_code == 200:
        print(f"Downloading {x+1} out of {len(song_filename_list)}!\n")
        pass
    else:
        print(f"\nStatus Code: {download_request.status_code}!\n")
        sys.exit()

        
    with open (song_filename_list[x], "wb") as file:
        file.write(download_request.content)

有人可以向我解释我做错了什么吗?
因为 get_file 函数调用后没有任何反应。
它跳过所有代码并没有任何错误地退出,所以我的逻辑错在哪里?


编辑 1:

将打印添加到后:

print(song_filename_list, song_link_list)
        with concurrent.futures.ThreadPoolExecutor() as executor:
            print("Before executor.map")
            executor.map(get_file, zip(song_filename_list, song_link_list))
            print("After executor.map")
            print(song_filename_list, song_link_list)

以及开始和结束get_file及其file.write

输出结果如下:


Succesfully logged in!

["songs names"] ["songs links"]    <- These are correct.
Before executor.map
After executor.map
["songs names"] ["songs links"]    <- These are correct.

Exiting.

换句话说,值是正确的,但它会跳过 executor.map 中的 get_file


编辑 2:

这是使用的值。


编辑 3:

经过一番修改后,这似乎可行。

for i in range(len(song_filename_list)):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.submit(get_file, song_filename_list, song_link_list, i, s)
def get_file(song_filename_list, song_link_list, i, s):
    
    download_request = s.get(song_link_list[i], headers= download_headers, stream=True)

    if download_request.status_code == 200:
        print("Downloading...")
        pass
    else:
        print(f"\nStatus Code: {download_request.status_code}!\n")
        sys.exit()
    
    with open (song_filename_list[i], "wb") as file:
        file.write(download_request.content)

在您的 download() 函数中,您提交了整个数组,而您应该提交每个项目:

def download(song_filename_list, song_link_list):
    with requests.Session() as s:
        login_request = s.post(login_url, 
            data=payload, 
            headers=headers)

        for i in range(len(song_filename_list)):
            with concurrent.futures.ThreadPoolExecutor() as executor:
                executor.submit(get_file, i)

您可以使用执行器 .map() 方法简化此操作:

def download(song_filename_list, song_link_list):
  with requests.Session() as session:
    session.post(login_url, 
        data=payload, 
        headers=headers)

  with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(get_file, song_filename_list, song_link_list)

其中get_file函数为:

def get_file(song_name, song_link):
  with requests.Session() as session:
    download_request = session.get(song_link, 
        headers=download_headers, 
        stream=True)

  if download_request.status_code == 200:
    print(f"Downloaded {song_name}")
  else:
    print(f"\nStatus Code: {download_request.status_code}!\n")
  
  with open(song_name, "wb") as file:
    file.write(download_request.content)

这避免了在线程之间共享状态,从而避免了潜在的数据竞争。

如果您需要监控已下载的歌曲数量,您可以使用 tqdm,它有一个 thread_map 迭代器包装器,可以执行此操作。