如何将任务并行分配给多个 celery worker?

How do I distribute tasks to multiple celery workers in parallel?

It seems every celery questions are like 5 years to 10 years old and utilizing old celery versions and design patterns

使用芹菜版本 5.0.5

我有一个查询数据库的芹菜任务,然后对查询

返回的行的每一行执行一些computations/calculations

问题是这个任务需要几分钟才能完成,因为查询返回了数千行,所以我试图并行分发给多个 celery worker

@celery.task()
def send_sms(to, body):
    from twilio.rest import Client

    account_sid = os.environ["ACCOUNT_SID"]
    auth_token = os.environ["AUTH_TOKEN"]
    from_ = os.environ["NUMBER"]

    client = Client(
        account_sid,
        auth_token,
    )

    message = client.messages.create(
        to=to,
        from_=from_,
        body=body,
    )


@celery.task()
def notify_users():
    session = create_session()
    query = session.query(Rentals).filter(Rentals.enabled == True)
    today = datetime.now()
    for q in query:
        if q.returned_date is not None:
            if (today - q.returned_date).total_seconds() < q.rental_period:
                continue


        user = session.query(Users).filter(User.id == q.user_id).one()

        to = send_notification_get_to.get(q.notification_method)(user)
        body = f"sending email to {user.email}"

        send_sms.delay(to, body)

将这些任务分配给多个工作人员的最佳方式是什么,而不是让一个工作人员 运行 几分钟,随着返回的行数从几千增加到几十,速度会呈指数级下降数以千计

我之前有过同样的用例,我做的是

我对查询进行了分页(将记录分成更小的块)并且每个页面都由 celery worker 处理

您也可以尝试使用不同的工作池,如 geventeventlet 池以获得更好的性能。

代码如下所示。

@celery.task()
def send_sms(to, body):
    from twilio.rest import Client

    account_sid = os.environ["ACCOUNT_SID"]
    auth_token = os.environ["AUTH_TOKEN"]
    from_ = os.environ["NUMBER"]

    client = Client(
        account_sid,
        auth_token,
    )

    message = client.messages.create(
        to=to,
        from_=from_,
        body=body,
    )


@celery.task()
def notify_range_of_users(num_chunks, skip):
    session = create_session()
    today = datetime.now()

    query = session.query(Rentals).filter(Rentals.enabled == True)

    paginated_query = query.limit(num_chunks).offset(skip * num_chunks)
    for q in paginated_query:

        if q.returned_date is not None:
            if (today - q.returned_date).total_seconds() < q.rental_period:
                continue

        user = session.query(Users).filter(User.id == q.user_id).one()

        to = send_notification_get_to.get(q.notification_method)(user)
        body = f"sending email to {user.email}"

        send_sms.delay(to, body)


@celery.task()
def notify_users():
    session = create_session()
    today = datetime.now()

    query = session.query(Rentals).filter(Rentals.enabled == True)
    total_rentals = query.count()
    # each chunk will contain, 100 rows/objects
    num_chunks = 100

    # find total number of chunks
    quo, remainder = divmod(total_rentals, num_chunks)
    # each job will contain a certain number of chunks
    jobs = quo

    if remainder:
        jobs = jobs + 1

    skip = 0
    for i in range(jobs):
        notify_range_of_users.delay(num_chunks, skip)

        # increment skip to go the next page
        skip = skip + 1