在 Python 中使用多线程方法将数据存储到 BigQuery 中
Issue storing data into BigQuery using a multi-threaded approach in Python
我正在实施 Python 脚本以从 Google BigQuery 数据库中获取现有用户数据,然后使用多线程方法为每个用户执行一些网络抓取功能,最后存储结果在 BigQuery 上的另一个 table 中。现有用户记录大约有 360 万条,对每个用户进行抓取最多需要 40 秒。我的目标是每天能够处理 100,000 个用户,所以这就是我需要并发处理方法的原因。
我正在使用 concurrent.futures
模块中的 ThreadPoolExecutor
。在给定数量的线程完成工作后,执行程序应该将相应批次的结果存储回 BigQuery。我看到线程继续执行它们的网络抓取功能。但在一定时间后(或使用大量线程),它们会停止将记录存储回数据库中。
起初,我想我是在处理一些与清除这批结果有关的竞争条件,但从那以后我从 threading
模块中实现了一个 BoundedSemaphore
来实现锁定我认为已经解决了原始问题的方法。但是结果仍然不能可靠地存储回数据库中。所以也许我错过了什么?
我需要一些在 Python 中具有丰富的并发处理经验的人的帮助。具体来说,我是 运行 Heroku 服务器上的脚本,因此 Heroku 经验可能也会有所帮助。谢谢!!我的代码片段如下:
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
batch = []
lock = BoundedSemaphore()
futures = [executor.submit(user_with_friends, row) for row in users]
print("FUTURE RESULTS", len(futures))
for index, future in enumerate(as_completed(futures)):
#print(index)
result = future.result()
# OK, so this locking business:
# ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
# ... restricts a thread's ability to acquire access to the batch until another one has released it
lock.acquire()
batch.append(result)
if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
print("-------------------------")
print(f"SAVING BATCH OF {len(batch)}...")
print("-------------------------")
service.append_user_friends(batch) # stores the results in another table on BigQuery
batch = []
lock.release()
另请参阅:
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore
因此,我最终使用了一种更可靠的不同方法(见下文)。旧方法在线程之间协调以存储结果,而新方法每个线程处理和存储一个批次。
def split_into_batches(all_users, batch_size=BATCH_SIZE):
"""h/t: """
for i in range(0, len(all_users), batch_size):
yield all_users[i : i + batch_size]
def process_and_save_batch(user_rows, bq):
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSING...")
bq.append_user_friends([user_with_friends(user_row) for user_row in user_rows])
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSED BATCH OF", len(user_rows))
return True
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT)
print("FETCHED UNIVERSE OF", len(users), "USERS")
batches = list(split_into_batches(users))
print(f"ASSEMBLED {len(batches)} BATCHES OF {BATCH_SIZE}")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
for batch in batches:
executor.submit(process_and_save_batch, batch, service)
当我将线程数显着增加到 2500 之类的数字时,脚本几乎完全停止存储结果(我仍想进一步调查这种行为),但我能够 运行它的线程数相对较低,并且正在完成工作。
我正在实施 Python 脚本以从 Google BigQuery 数据库中获取现有用户数据,然后使用多线程方法为每个用户执行一些网络抓取功能,最后存储结果在 BigQuery 上的另一个 table 中。现有用户记录大约有 360 万条,对每个用户进行抓取最多需要 40 秒。我的目标是每天能够处理 100,000 个用户,所以这就是我需要并发处理方法的原因。
我正在使用 concurrent.futures
模块中的 ThreadPoolExecutor
。在给定数量的线程完成工作后,执行程序应该将相应批次的结果存储回 BigQuery。我看到线程继续执行它们的网络抓取功能。但在一定时间后(或使用大量线程),它们会停止将记录存储回数据库中。
起初,我想我是在处理一些与清除这批结果有关的竞争条件,但从那以后我从 threading
模块中实现了一个 BoundedSemaphore
来实现锁定我认为已经解决了原始问题的方法。但是结果仍然不能可靠地存储回数据库中。所以也许我错过了什么?
我需要一些在 Python 中具有丰富的并发处理经验的人的帮助。具体来说,我是 运行 Heroku 服务器上的脚本,因此 Heroku 经验可能也会有所帮助。谢谢!!我的代码片段如下:
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
batch = []
lock = BoundedSemaphore()
futures = [executor.submit(user_with_friends, row) for row in users]
print("FUTURE RESULTS", len(futures))
for index, future in enumerate(as_completed(futures)):
#print(index)
result = future.result()
# OK, so this locking business:
# ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
# ... restricts a thread's ability to acquire access to the batch until another one has released it
lock.acquire()
batch.append(result)
if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
print("-------------------------")
print(f"SAVING BATCH OF {len(batch)}...")
print("-------------------------")
service.append_user_friends(batch) # stores the results in another table on BigQuery
batch = []
lock.release()
另请参阅:
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore
因此,我最终使用了一种更可靠的不同方法(见下文)。旧方法在线程之间协调以存储结果,而新方法每个线程处理和存储一个批次。
def split_into_batches(all_users, batch_size=BATCH_SIZE):
"""h/t: """
for i in range(0, len(all_users), batch_size):
yield all_users[i : i + batch_size]
def process_and_save_batch(user_rows, bq):
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSING...")
bq.append_user_friends([user_with_friends(user_row) for user_row in user_rows])
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSED BATCH OF", len(user_rows))
return True
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT)
print("FETCHED UNIVERSE OF", len(users), "USERS")
batches = list(split_into_batches(users))
print(f"ASSEMBLED {len(batches)} BATCHES OF {BATCH_SIZE}")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
for batch in batches:
executor.submit(process_and_save_batch, batch, service)
当我将线程数显着增加到 2500 之类的数字时,脚本几乎完全停止存储结果(我仍想进一步调查这种行为),但我能够 运行它的线程数相对较低,并且正在完成工作。