如何多线程从列表中读取字典并进入数据库
How to multithread reading dictionaries from a list and entering into database
我正在尝试对以下代码进行多线程处理,但我似乎无法使其正常工作。
以下代码(出于说明目的,我删除了大部分代码)目前运行流畅,但速度较慢(3600 条推文的列表大约需要 5 分钟)。
import dataset
import datetime
import json
with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()
DB = dataset.connect(DB_CONNECTIONSTRING)
def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
data = {'tweet_id': tweet['tweet_id',
'tweet_json': json.dumps(tweet)} # Dictionary that contains the data I need from the tweet
DB['tweets'].upsert(data, ['tweet_id'])
tweets_saved += 1
if tweets_saved % 100 == 0:
print('Saved ' + str(tweets_saved) + ' tweets')
return tweets_saved
except KeyError:
return tweets_saved
if __name__ == "__main__":
tweets['tweet1', 'tweet2']
for tweet in tweets:
tweets_saved = load_tweet(tweet, tweets_saved)
因此,我一直在寻找执行此多线程的选项。但是,我还没有找到一种方法可以:
- 多线程提取过程;
- 每 100、500 或 1000 条推文打印一个计数器;
通过 this tutorial 还没有让我理解这样做:每个线程的 class 的概念,我需要在 class 中自定义的内容和目前实现一个队列对我来说掌握了很多;我才刚刚起步。
- 有人可以提供有关如何使用多线程合并上述脚本的提示吗?
- 我应该使用多少线程? Python 目前在运行脚本时使用了我的 CPU 的 ~1% 和 RAM 的 ~10%(我的 system specs)
- 如何处理递增计数器(使用 Lock()?),并在达到计数器 % 100 时打印?
编辑:根据要求:这里是探查器结果的主要部分(dataset.upsert):
ncalls tottime percall cumtime percall filename:lineno(function)
5898 245.133 0.042 245.133 0.042 :0(_connect)
5898 12.137 0.002 12.206 0.002 :0(execute)
这是用 'dataset.insert' 代替 'dataset.upsert' 的第二次尝试:
1386332 function calls (1382960 primitive calls) in 137.255 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
2955 122.646 0.042 122.646 0.042 :0 (_connect)
最后(绝对不是最不重要的),这是运行原始 psycopg2 代码的时间。
63694 function calls (63680 primitive calls) in 2.203 seconds
最后,不要使用数据集来提高性能(尽管编写 psycopg2 代码花了我 10 分钟,这是 >> dataset.upsert 的 10 秒)
- 现在,回到原来的问题。我可以通过多线程进一步减少每个文件约 2 秒吗?怎么样?
可以找到完整代码here
我不知道你是否能够提高性能。但至于我认为你会想要什么 concurrent.futures.Executor.map。 ProcessPoolExecutor 而不是 ThreadPoolExecutor 应该是你想要的,虽然我不是专家。
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
如果您想显示进度,请查看同一模块中的 concurrent.futures.as_completed。
几个可以改进的地方:
运行 整个批次一次交易。使用事务意味着数据库不需要在每次写入时实际提交(将数据写入磁盘),而是可以在内存中缓冲未提交的数据。这通常会导致更有效的资源使用。
在 tweet_id 上添加唯一索引。如果没有唯一索引,您可能会强制数据库对每个更新插入进行顺序扫描,这会导致批量更新扩展 O(n**2)。
拆分插入和更新,尽可能使用 .insert_many() 而不是 .upsert()。在执行批量更新插入之前,您执行飞行前查询以找出存在于数据库和推文列表中的 tweet_id 列表。使用 .insert_many() 插入数据库中尚不存在的项目,并使用 .update() 插入已存在的项目。
我正在尝试对以下代码进行多线程处理,但我似乎无法使其正常工作。
以下代码(出于说明目的,我删除了大部分代码)目前运行流畅,但速度较慢(3600 条推文的列表大约需要 5 分钟)。
import dataset
import datetime
import json
with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()
DB = dataset.connect(DB_CONNECTIONSTRING)
def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
data = {'tweet_id': tweet['tweet_id',
'tweet_json': json.dumps(tweet)} # Dictionary that contains the data I need from the tweet
DB['tweets'].upsert(data, ['tweet_id'])
tweets_saved += 1
if tweets_saved % 100 == 0:
print('Saved ' + str(tweets_saved) + ' tweets')
return tweets_saved
except KeyError:
return tweets_saved
if __name__ == "__main__":
tweets['tweet1', 'tweet2']
for tweet in tweets:
tweets_saved = load_tweet(tweet, tweets_saved)
因此,我一直在寻找执行此多线程的选项。但是,我还没有找到一种方法可以:
- 多线程提取过程;
- 每 100、500 或 1000 条推文打印一个计数器;
通过 this tutorial 还没有让我理解这样做:每个线程的 class 的概念,我需要在 class 中自定义的内容和目前实现一个队列对我来说掌握了很多;我才刚刚起步。
- 有人可以提供有关如何使用多线程合并上述脚本的提示吗?
- 我应该使用多少线程? Python 目前在运行脚本时使用了我的 CPU 的 ~1% 和 RAM 的 ~10%(我的 system specs)
- 如何处理递增计数器(使用 Lock()?),并在达到计数器 % 100 时打印?
编辑:根据要求:这里是探查器结果的主要部分(dataset.upsert):
ncalls tottime percall cumtime percall filename:lineno(function)
5898 245.133 0.042 245.133 0.042 :0(_connect)
5898 12.137 0.002 12.206 0.002 :0(execute)
这是用 'dataset.insert' 代替 'dataset.upsert' 的第二次尝试:
1386332 function calls (1382960 primitive calls) in 137.255 seconds
ncalls tottime percall cumtime percall filename:lineno(function)
2955 122.646 0.042 122.646 0.042 :0 (_connect)
最后(绝对不是最不重要的),这是运行原始 psycopg2 代码的时间。
63694 function calls (63680 primitive calls) in 2.203 seconds
最后,不要使用数据集来提高性能(尽管编写 psycopg2 代码花了我 10 分钟,这是 >> dataset.upsert 的 10 秒)
- 现在,回到原来的问题。我可以通过多线程进一步减少每个文件约 2 秒吗?怎么样?
可以找到完整代码here
我不知道你是否能够提高性能。但至于我认为你会想要什么 concurrent.futures.Executor.map。 ProcessPoolExecutor 而不是 ThreadPoolExecutor 应该是你想要的,虽然我不是专家。
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
如果您想显示进度,请查看同一模块中的 concurrent.futures.as_completed。
几个可以改进的地方:
运行 整个批次一次交易。使用事务意味着数据库不需要在每次写入时实际提交(将数据写入磁盘),而是可以在内存中缓冲未提交的数据。这通常会导致更有效的资源使用。
在 tweet_id 上添加唯一索引。如果没有唯一索引,您可能会强制数据库对每个更新插入进行顺序扫描,这会导致批量更新扩展 O(n**2)。
拆分插入和更新,尽可能使用 .insert_many() 而不是 .upsert()。在执行批量更新插入之前,您执行飞行前查询以找出存在于数据库和推文列表中的 tweet_id 列表。使用 .insert_many() 插入数据库中尚不存在的项目,并使用 .update() 插入已存在的项目。