Django celery 任务:字典在迭代过程中改变了大小

Django celery task: dictionary changed size during iteration

在 Django 1.8 中,我有这个视图供用户 post 使用 celery 函数通知用户 post 的关注者,但它会产生一个相当混乱的错误:

 dictionary changed size during iteration

视图如下:

  @login_required
    def topic_reply(request, topic_id):
        tform = PostForm()
        topic = Topic.objects.get(pk=topic_id)
        args = {}
        posts = Post.objects.filter(topic= topic)
        posts =  Paginator(posts, DJANGO_SIMPLE_FORUM_REPLIES_PER_PAGE)

        if request.method == 'POST':
            post = PostForm(request.POST)

            if post.is_valid():
                p = post.save(commit = False)
                p.topic = topic
                p.title = post.cleaned_data['title']
                p.body = post.cleaned_data['body']
                p.creator = request.user
                p.user_ip = request.META['REMOTE_ADDR']

                if len(p.title)< 1:
                                p.title=p.body[:60]                                              
                    p.save()

                    #notify followers of the new post creation                        
                    title = 'title' #topic.title
                    link = 'bla' #topic.slug        
                    flwd = request.user
                    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
                    flwrs = User.objects.filter(id__in= flwr_ids).values('username','email') 

                    notify_new_post.delay(flwd, flwrs , title, link) #<- here the is the problem

                    return HttpResponseRedirect('/forum/topic/%s/?page=%s'  % (topic.slug, posts.num_pages))
                else:
                    return HttpResponseRedirect('/forum/topic/%s/?page=%s'  % (topic.slug, posts.num_pages))
        else:
            args.update(csrf(request))
            args['form'] = tform
            args['topic'] = topic
            return render_to_response('myforum/reply.html', args, 
                                      context_instance=RequestContext(request))

这甚至发生在任何东西被传递给函数之前(我没有看到 celery 守护进程发生任何事情)

这是芹菜函数:

#@app.task
@task()
def notify_new_post(flwd, flwrs, topic, link):
    print 'post notification \n'
    subject = 'New post'
    from_email = 'noreply@example.com'
    #to_list = [email]     

    for f in flwrs:

        to_email = f['email'].encode('ascii')
        print "[to_email]: " , [to_email]
        args = Context({
            'flwd': flwd,
            'recepient': f['username'],
            'link': link
           })  
       # if to_email !=[]:               
        plaintext = get_template('myforum/email_new_post.txt')
        htmltext = get_template('myforum/email_new_post.html')

        text_content = plaintext.render(args)
        html_content = htmltext.render(args)

        msg = EmailMultiAlternatives(subject, text_content, from_email, [to_email])
        msg.attach_alternative(html_content, "text/html")
        try:
            msg.send()
            print "[to_email]: " , [to_email]
            print 'message sent! \n'
        except Exception as e:  
            print '%s (%s)' % (e.message, type(e))

这对我来说很奇怪,因为 'topic' 视图的非常相似的任务运行得非常好。我对此感到非常困惑,非常感谢您的提示。

更新:这里是回溯

Environment:


Request Method: POST
Request URL: http://127.0.0.1:8000/forum/reply/52/

Django Version: 1.8.3
Python Version: 2.7.3
Installed Applications:
('django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'django.contrib.humanize',
 'registration',
 'aricle',
 'photo',
 'contact',
 'captcha',
 'pure_pagination',
 'emoticons',
 'debug_toolbar',
 'django_markdown',
 'myforum',
 'userprofile',
 'userpics',
 'djcelery')
Installed Middleware:
(u'debug_toolbar.middleware.DebugToolbarMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware',
 'userprofile.middleware.ActiveUserMiddleware')
Traceback:
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/django/core/handlers/base.py" in get_response
  132.                     response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/django/contrib/auth/decorators.py" in _wrapped_view
  22.                 return view_func(request, *args, **kwargs)
File "/home/mypc/myproj/myforum/views.py" in topic_reply
  315.                 notify_new_post.delay(flwd, flwrs , title, link)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/celery/app/task.py" in delay
  453.         return self.apply_async(args, kwargs)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/celery/app/task.py" in apply_async
  555.             **dict(self._get_exec_options(), **options)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/celery/app/base.py" in send_task
  353.                 reply_to=reply_to or self.oid, **options
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/celery/app/amqp.py" in publish_task
  305.             **kwargs
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/messaging.py" in publish
  161.             compression, headers)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/messaging.py" in _prepare
  237.              body) = dumps(body, serializer=serializer)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/serialization.py" in dumps
  164.             payload = encoder(data)
File "/usr/lib/python2.7/contextlib.py" in __exit__
  35.                 self.gen.throw(type, value, traceback)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/serialization.py" in _reraise_errors
  59.         reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/serialization.py" in _reraise_errors
  55.         yield
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/serialization.py" in dumps
  164.             payload = encoder(data)
File "/home/mypc/.projenv/local/lib/python2.7/site-packages/kombu/serialization.py" in pickle_dumps
  356.         return dumper(obj, protocol=pickle_protocol)

Exception Type: EncodeError at /forum/reply/52/
Exception Value: dictionary changed size during iteration

这可能是因为您为 flwrs 传递的值是 django.db.models.query.ValuesQuerySet。 Django 查询集是延迟求值的,我认为这不利于序列化。请记住,您发送给 Celery 任务的所有内容以及从中发送的 return 都必须进行序列化。因此,除了简单类型或您知道可以序列化的类型之外,不建议传递任何其他类型(例如,您自己设计的 class 或您已经从里到外检查以确保将序列化的类型干净利落)。

我建议的最小修正是通过 list(flwrs) 而不是 flwrs。这会将查询集变成普通的 list。我还强烈建议将 request.user.id 作为 flwd 而不是用户对象本身传递。传递 ORM 对象是获得意外行为的可靠方法。 (Celery 文档提到了这一点。)在 Celery 任务中传递一个 id 并重新获取对象是可行的方法。

但是,当我整体查看代码时,我不明白为什么数据库访问是在视图中而不是在 Celery 任务中执行的。因此,除非我在某处遗漏了一行或变量使用,否则我会将您的代码更改为仅将 request.user.id 作为 flwd 传递,然后在 Celery 任务中执行数据库访问。所以视图会像这样调用任务:

#notify followers of the new post creation                        
title = 'title' #topic.title
link = 'bla' #topic.slug        
notify_new_post.delay(request.user.id, title, link)

任务会这样开始:

from django.contrib.auth import get_user_model
@task()
def notify_new_post(flwd_id, topic, link):
    user_model = get_user_model()
    flwd = user_model.objects.get(id=flwd_id)
    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
    flwrs = user_model.objects.filter(id__in= flwr_ids).values('username','email') 

(关于最后一行的注意事项:我假设 User 是您的 Django 项目使用的用户模型,所以我使用 [=22= 的 return 值](分配给 user_model)而不是直接使用 User。如果我的假设不正确并且 User 是其他东西,那么您必须像最初那样使用 User做了。)