大型(ish)数据集上的 Django 数据库操作缓慢。
Slow django database operations on large (ish) dataset.
我设置了一个系统来过滤 Twitter 实时流示例。显然,数据库写入速度太慢,跟不上比几个低容量关键字更复杂的事情。我将 django-rq 实现为一个简单的队列系统,在推文进入时将推文推送到基于 redis 的队列中,效果很好。我的问题在另一边。这个问题的背景是我现在有一个 运行 的系统,有 150 万条推文供分析,另外 375,000 条通过 Redis 排队。以目前的性能速度,如果我关闭流媒体,我将需要大约 3 天的时间才能赶上进度,但我不想这样做。如果我维护这些流,那么根据我最后的估计,大约需要一个月的时间。
数据库现在有几百万行横跨两个主表,写入速度很慢。 rq-worker 的最佳数量似乎是四个,平均每秒 1.6 个队列任务。 (下面排队的代码)。我认为问题可能是为每个新队列任务打开数据库连接,所以将 CONN_MAX_AGE 设置为 60,但这并没有改善任何东西。
刚刚在本地主机上进行了测试,我得到了超过 13 个 writes/second,在 Macbook 2011 上 Chrome,等等 运行,但只有少数该数据库中有数千行,这让我相信它与大小有关。我正在使用几个 get_or_create
命令(见下文),这可能会减慢速度,但无法通过使用它们看到任何其他方式 - 我需要检查用户是否存在,并且我需要检查推文是否已经存在(我怀疑,我可能会将后者移至 try/except,基于来自直播流的推文不应该已经存在,原因很明显。)我从中获得了很多性能提升?由于这仍然是 运行,我很想稍微优化一下代码并在里面找一些 faster/more 高效的工作人员,这样我就可以赶上了! 运行 预审工作人员可以批量处理吗? (也就是说,我可以批量创建不存在的用户,或类似的东西?)
我是 运行 数字海洋上的 4 Core/8Gb Ram 液滴,所以感觉这是一些非常糟糕的性能,并且可能与代码有关。我哪里错了?
(我在这里发布了这个而不是代码审查,因为我认为这与 SO 的问答格式相关,因为我正在尝试解决特定的代码问题,而不是 'how can I do this generally better?')
注意: 我在 django 1.6 中工作,因为这是我已经使用了一段时间的代码,当时对升级没有信心 - 它是不是 public 面对,所以除非现在有令人信服的理由(比如这个性能问题),否则我不会升级(对于这个项目)。
流监听器:
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
# Twitter returns data in JSON format - we need to decode it first
decoded = json.loads(data)
#print type(decoded), decoded
# Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
try:
if decoded['lang'] == 'en':
django_rq.enqueue(read_both, decoded)
else:
pass
except KeyError,e:
print "Error on Key", e
except DataError, e:
print "DataError", e
return True
def on_error(self, status):
print status
阅读User/Tweet/Both
def read_user(tweet):
from harvester.models import User
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
#We might get weird results where user has changed their details"], so first we check the UID.
#print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
try:
current_user = User.objects.get(id_str=tweet["user"]["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
return current_user, False
if not tweet["user"]["follow_request_sent"]:
tweet["user"]["follow_request_sent"] = False
if not tweet["user"]["following"]:
tweet["user"]["following"] = False
if not tweet["user"]["description"]:
tweet["user"]["description"] = " "
if not tweet["user"]["notifications"]:
tweet["user"]["notifications"] = False
#If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
from dateutil.parser import parse
if not tweet["user"]["contributors_enabled"]:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = False,
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
else:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = tweet["user"]["contributers_enabled"],
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
#print "CURRENT USER:""], type(current_user)"], current_user
#current_user"], created = User.objects.get_or_create(current_user)
return current_user, created
def read_tweet(tweet, current_user):
import logging
logger = logging.getLogger('django')
from datetime import date, datetime
#print "Inside read_Tweet"
from harvester.models import Tweet
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.db import DataError
#We might get weird results where user has changed their details"], so first we check the UID.
#print tweet_data["created_at"]
from dateutil.parser import parse
tweet["created_at"] = parse(tweet["created_at"])
try:
#print "trying tweet_data["id"
current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
try:
current_tweet, created = Tweet.objects.get_or_create(
truncated=tweet["truncated"],
text=tweet["text"],
favorite_count=tweet["favorite_count"],
author = current_user,
_json = {},
source=tweet["source"],
retweeted=tweet["retweeted"],
coordinates = tweet["coordinates"],
entities = tweet["entities"],
in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
id_str = tweet["id_str"],
retweet_count = tweet["retweet_count"],
favorited = tweet["favorited"],
user = tweet["user"],
geo = tweet["geo"],
in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
lang = tweet["lang"],
created_at = tweet["created_at"],
place = tweet["place"])
print "DEBUG", current_user, current_tweet
return current_tweet, created
except DataError, e:
#Catchall to pick up non-parsed tweets
print "DEBUG ERROR", e, tweet
return None, False
def read_both(tweet):
current_user, created = read_user(tweet)
current_tweet, created = read_tweet(tweet, current_user)
我最终设法拼凑了一些 redditor 的答案和其他一些东西。
从根本上说,虽然我在 id_str 字段上进行了双重查找,但该字段未编入索引。我在 read_tweet
和 read_user
上向该字段添加了索引 db_index=True
,并将阅读推文移动到 try/except Tweet.objects.create
方法,回落到 get_or_create 如果出现问题,并且速度提高了 50-60 倍,现在可以扩展工作人员 - 如果我添加 10 名工作人员,我将获得 10 倍的速度。
我目前有一名员工每秒可以愉快地处理 6 条左右的推文。接下来,我将添加一个监控守护进程来检查队列大小,如果它仍在增加,则添加额外的工作人员。
tl;dr - 记住索引!
我设置了一个系统来过滤 Twitter 实时流示例。显然,数据库写入速度太慢,跟不上比几个低容量关键字更复杂的事情。我将 django-rq 实现为一个简单的队列系统,在推文进入时将推文推送到基于 redis 的队列中,效果很好。我的问题在另一边。这个问题的背景是我现在有一个 运行 的系统,有 150 万条推文供分析,另外 375,000 条通过 Redis 排队。以目前的性能速度,如果我关闭流媒体,我将需要大约 3 天的时间才能赶上进度,但我不想这样做。如果我维护这些流,那么根据我最后的估计,大约需要一个月的时间。
数据库现在有几百万行横跨两个主表,写入速度很慢。 rq-worker 的最佳数量似乎是四个,平均每秒 1.6 个队列任务。 (下面排队的代码)。我认为问题可能是为每个新队列任务打开数据库连接,所以将 CONN_MAX_AGE 设置为 60,但这并没有改善任何东西。
刚刚在本地主机上进行了测试,我得到了超过 13 个 writes/second,在 Macbook 2011 上 Chrome,等等 运行,但只有少数该数据库中有数千行,这让我相信它与大小有关。我正在使用几个 get_or_create
命令(见下文),这可能会减慢速度,但无法通过使用它们看到任何其他方式 - 我需要检查用户是否存在,并且我需要检查推文是否已经存在(我怀疑,我可能会将后者移至 try/except,基于来自直播流的推文不应该已经存在,原因很明显。)我从中获得了很多性能提升?由于这仍然是 运行,我很想稍微优化一下代码并在里面找一些 faster/more 高效的工作人员,这样我就可以赶上了! 运行 预审工作人员可以批量处理吗? (也就是说,我可以批量创建不存在的用户,或类似的东西?)
我是 运行 数字海洋上的 4 Core/8Gb Ram 液滴,所以感觉这是一些非常糟糕的性能,并且可能与代码有关。我哪里错了?
(我在这里发布了这个而不是代码审查,因为我认为这与 SO 的问答格式相关,因为我正在尝试解决特定的代码问题,而不是 'how can I do this generally better?')
注意: 我在 django 1.6 中工作,因为这是我已经使用了一段时间的代码,当时对升级没有信心 - 它是不是 public 面对,所以除非现在有令人信服的理由(比如这个性能问题),否则我不会升级(对于这个项目)。
流监听器:
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
# Twitter returns data in JSON format - we need to decode it first
decoded = json.loads(data)
#print type(decoded), decoded
# Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
try:
if decoded['lang'] == 'en':
django_rq.enqueue(read_both, decoded)
else:
pass
except KeyError,e:
print "Error on Key", e
except DataError, e:
print "DataError", e
return True
def on_error(self, status):
print status
阅读User/Tweet/Both
def read_user(tweet):
from harvester.models import User
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
#We might get weird results where user has changed their details"], so first we check the UID.
#print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
try:
current_user = User.objects.get(id_str=tweet["user"]["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
return current_user, False
if not tweet["user"]["follow_request_sent"]:
tweet["user"]["follow_request_sent"] = False
if not tweet["user"]["following"]:
tweet["user"]["following"] = False
if not tweet["user"]["description"]:
tweet["user"]["description"] = " "
if not tweet["user"]["notifications"]:
tweet["user"]["notifications"] = False
#If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
from dateutil.parser import parse
if not tweet["user"]["contributors_enabled"]:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = False,
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
else:
current_user, created = User.objects.get_or_create(
follow_request_sent=tweet["user"]["follow_request_sent"],
_json = {},
verified = tweet["user"]["verified"],
followers_count = tweet["user"]["followers_count"],
profile_image_url_https = tweet["user"]["profile_image_url_https"],
id_str = tweet["user"]["id_str"],
listed_count = tweet["user"]["listed_count"],
utc_offset = tweet["user"]["utc_offset"],
statuses_count = tweet["user"]["statuses_count"],
description = tweet["user"]["description"],
friends_count = tweet["user"]["friends_count"],
location = tweet["user"]["location"],
profile_image_url= tweet["user"]["profile_image_url"],
following = tweet["user"]["following"],
geo_enabled = tweet["user"]["geo_enabled"],
profile_background_image_url =tweet["user"]["profile_background_image_url"],
screen_name = tweet["user"]["screen_name"],
lang = tweet["user"]["lang"],
profile_background_tile = tweet["user"]["profile_background_tile"],
favourites_count = tweet["user"]["favourites_count"],
name = tweet["user"]["name"],
notifications = tweet["user"]["notifications"],
url = tweet["user"]["url"],
created_at = parse(tweet["user"]["created_at"]),
contributors_enabled = tweet["user"]["contributers_enabled"],
time_zone = tweet["user"]["time_zone"],
protected = tweet["user"]["protected"],
default_profile = tweet["user"]["default_profile"],
is_translator = tweet["user"]["is_translator"]
)
#print "CURRENT USER:""], type(current_user)"], current_user
#current_user"], created = User.objects.get_or_create(current_user)
return current_user, created
def read_tweet(tweet, current_user):
import logging
logger = logging.getLogger('django')
from datetime import date, datetime
#print "Inside read_Tweet"
from harvester.models import Tweet
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.db import DataError
#We might get weird results where user has changed their details"], so first we check the UID.
#print tweet_data["created_at"]
from dateutil.parser import parse
tweet["created_at"] = parse(tweet["created_at"])
try:
#print "trying tweet_data["id"
current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
created=False
return current_user, created
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
try:
current_tweet, created = Tweet.objects.get_or_create(
truncated=tweet["truncated"],
text=tweet["text"],
favorite_count=tweet["favorite_count"],
author = current_user,
_json = {},
source=tweet["source"],
retweeted=tweet["retweeted"],
coordinates = tweet["coordinates"],
entities = tweet["entities"],
in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
id_str = tweet["id_str"],
retweet_count = tweet["retweet_count"],
favorited = tweet["favorited"],
user = tweet["user"],
geo = tweet["geo"],
in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
lang = tweet["lang"],
created_at = tweet["created_at"],
place = tweet["place"])
print "DEBUG", current_user, current_tweet
return current_tweet, created
except DataError, e:
#Catchall to pick up non-parsed tweets
print "DEBUG ERROR", e, tweet
return None, False
def read_both(tweet):
current_user, created = read_user(tweet)
current_tweet, created = read_tweet(tweet, current_user)
我最终设法拼凑了一些 redditor 的答案和其他一些东西。
从根本上说,虽然我在 id_str 字段上进行了双重查找,但该字段未编入索引。我在 read_tweet
和 read_user
上向该字段添加了索引 db_index=True
,并将阅读推文移动到 try/except Tweet.objects.create
方法,回落到 get_or_create 如果出现问题,并且速度提高了 50-60 倍,现在可以扩展工作人员 - 如果我添加 10 名工作人员,我将获得 10 倍的速度。
我目前有一名员工每秒可以愉快地处理 6 条左右的推文。接下来,我将添加一个监控守护进程来检查队列大小,如果它仍在增加,则添加额外的工作人员。
tl;dr - 记住索引!