在 Python 中使用多线程方法将数据存储到 BigQuery 中

Issue storing data into BigQuery using a multi-threaded approach in Python

我正在实施 Python 脚本以从 Google BigQuery 数据库中获取现有用户数据,然后使用多线程方法为每个用户执行一些网络抓取功能,最后存储结果在 BigQuery 上的另一个 table 中。现有用户记录大约有 360 万条,对每个用户进行抓取最多需要 40 秒。我的目标是每天能够处理 100,000 个用户,所以这就是我需要并发处理方法的原因。

我正在使用 concurrent.futures 模块中的 ThreadPoolExecutor。在给定数量的线程完成工作后,执行程序应该将相应批次的结果存储回 BigQuery。我看到线程继续执行它们的网络抓取功能。但在一定时间后(或使用大量线程),它们会停止将记录存储回数据库中。

起初,我想我是在处理一些与清除这批结果有关的竞争条件,但从那以后我从 threading 模块中实现了一个 BoundedSemaphore 来实现锁定我认为已经解决了原始问题的方法。但是结果仍然不能可靠地存储回数据库中。所以也许我错过了什么?

我需要一些在 Python 中具有丰富的并发处理经验的人的帮助。具体来说,我是 运行 Heroku 服务器上的脚本,因此 Heroku 经验可能也会有所帮助。谢谢!!我的代码片段如下:

service = BigQueryService() # a custom class defined elsewhere

users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
    batch = []
    lock = BoundedSemaphore()
    futures = [executor.submit(user_with_friends, row) for row in users]
    print("FUTURE RESULTS", len(futures))
    for index, future in enumerate(as_completed(futures)):
        #print(index)
        result = future.result()

        # OK, so this locking business:
        # ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
        # ... restricts a thread's ability to acquire access to the batch until another one has released it
        lock.acquire()
        batch.append(result)
        if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
            print("-------------------------")
            print(f"SAVING BATCH OF {len(batch)}...")
            print("-------------------------")
            service.append_user_friends(batch) # stores the results in another table on BigQuery
            batch = []
        lock.release()

另请参阅:

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore

因此,我最终使用了一种更可靠的不同方法(见下文)。旧方法在线程之间协调以存储结果,而新方法每个线程处理和存储一个批次。

def split_into_batches(all_users, batch_size=BATCH_SIZE):
    """h/t: """
    for i in range(0, len(all_users), batch_size):
        yield all_users[i : i + batch_size]

def process_and_save_batch(user_rows, bq):
    print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSING...")
    bq.append_user_friends([user_with_friends(user_row) for user_row in user_rows])
    print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSED BATCH OF", len(user_rows))
    return True

service = BigQueryService() # a custom class defined elsewhere

users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT)
print("FETCHED UNIVERSE OF", len(users), "USERS")

batches = list(split_into_batches(users))
print(f"ASSEMBLED {len(batches)} BATCHES OF {BATCH_SIZE}")

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:

    for batch in batches:
        executor.submit(process_and_save_batch, batch, service)

当我将线程数显着增加到 2500 之类的数字时,脚本几乎完全停止存储结果(我仍想进一步调查这种行为),但我能够 运行它的线程数相对较低,并且正在完成工作。