如何将任务并行分配给多个 celery worker?
How do I distribute tasks to multiple celery workers in parallel?
It seems every celery questions are like 5 years to 10 years old and
utilizing old celery versions and design patterns
使用芹菜版本 5.0.5
我有一个查询数据库的芹菜任务,然后对查询
返回的行的每一行执行一些computations/calculations
问题是这个任务需要几分钟才能完成,因为查询返回了数千行,所以我试图并行分发给多个 celery worker
@celery.task()
def send_sms(to, body):
from twilio.rest import Client
account_sid = os.environ["ACCOUNT_SID"]
auth_token = os.environ["AUTH_TOKEN"]
from_ = os.environ["NUMBER"]
client = Client(
account_sid,
auth_token,
)
message = client.messages.create(
to=to,
from_=from_,
body=body,
)
@celery.task()
def notify_users():
session = create_session()
query = session.query(Rentals).filter(Rentals.enabled == True)
today = datetime.now()
for q in query:
if q.returned_date is not None:
if (today - q.returned_date).total_seconds() < q.rental_period:
continue
user = session.query(Users).filter(User.id == q.user_id).one()
to = send_notification_get_to.get(q.notification_method)(user)
body = f"sending email to {user.email}"
send_sms.delay(to, body)
将这些任务分配给多个工作人员的最佳方式是什么,而不是让一个工作人员 运行 几分钟,随着返回的行数从几千增加到几十,速度会呈指数级下降数以千计
我之前有过同样的用例,我做的是
我对查询进行了分页(将记录分成更小的块)并且每个页面都由 celery worker 处理
您也可以尝试使用不同的工作池,如 gevent、eventlet 池以获得更好的性能。
代码如下所示。
@celery.task()
def send_sms(to, body):
from twilio.rest import Client
account_sid = os.environ["ACCOUNT_SID"]
auth_token = os.environ["AUTH_TOKEN"]
from_ = os.environ["NUMBER"]
client = Client(
account_sid,
auth_token,
)
message = client.messages.create(
to=to,
from_=from_,
body=body,
)
@celery.task()
def notify_range_of_users(num_chunks, skip):
session = create_session()
today = datetime.now()
query = session.query(Rentals).filter(Rentals.enabled == True)
paginated_query = query.limit(num_chunks).offset(skip * num_chunks)
for q in paginated_query:
if q.returned_date is not None:
if (today - q.returned_date).total_seconds() < q.rental_period:
continue
user = session.query(Users).filter(User.id == q.user_id).one()
to = send_notification_get_to.get(q.notification_method)(user)
body = f"sending email to {user.email}"
send_sms.delay(to, body)
@celery.task()
def notify_users():
session = create_session()
today = datetime.now()
query = session.query(Rentals).filter(Rentals.enabled == True)
total_rentals = query.count()
# each chunk will contain, 100 rows/objects
num_chunks = 100
# find total number of chunks
quo, remainder = divmod(total_rentals, num_chunks)
# each job will contain a certain number of chunks
jobs = quo
if remainder:
jobs = jobs + 1
skip = 0
for i in range(jobs):
notify_range_of_users.delay(num_chunks, skip)
# increment skip to go the next page
skip = skip + 1
It seems every celery questions are like 5 years to 10 years old and utilizing old celery versions and design patterns
使用芹菜版本 5.0.5
我有一个查询数据库的芹菜任务,然后对查询
返回的行的每一行执行一些computations/calculations问题是这个任务需要几分钟才能完成,因为查询返回了数千行,所以我试图并行分发给多个 celery worker
@celery.task()
def send_sms(to, body):
from twilio.rest import Client
account_sid = os.environ["ACCOUNT_SID"]
auth_token = os.environ["AUTH_TOKEN"]
from_ = os.environ["NUMBER"]
client = Client(
account_sid,
auth_token,
)
message = client.messages.create(
to=to,
from_=from_,
body=body,
)
@celery.task()
def notify_users():
session = create_session()
query = session.query(Rentals).filter(Rentals.enabled == True)
today = datetime.now()
for q in query:
if q.returned_date is not None:
if (today - q.returned_date).total_seconds() < q.rental_period:
continue
user = session.query(Users).filter(User.id == q.user_id).one()
to = send_notification_get_to.get(q.notification_method)(user)
body = f"sending email to {user.email}"
send_sms.delay(to, body)
将这些任务分配给多个工作人员的最佳方式是什么,而不是让一个工作人员 运行 几分钟,随着返回的行数从几千增加到几十,速度会呈指数级下降数以千计
我之前有过同样的用例,我做的是
我对查询进行了分页(将记录分成更小的块)并且每个页面都由 celery worker 处理
您也可以尝试使用不同的工作池,如 gevent、eventlet 池以获得更好的性能。
代码如下所示。
@celery.task()
def send_sms(to, body):
from twilio.rest import Client
account_sid = os.environ["ACCOUNT_SID"]
auth_token = os.environ["AUTH_TOKEN"]
from_ = os.environ["NUMBER"]
client = Client(
account_sid,
auth_token,
)
message = client.messages.create(
to=to,
from_=from_,
body=body,
)
@celery.task()
def notify_range_of_users(num_chunks, skip):
session = create_session()
today = datetime.now()
query = session.query(Rentals).filter(Rentals.enabled == True)
paginated_query = query.limit(num_chunks).offset(skip * num_chunks)
for q in paginated_query:
if q.returned_date is not None:
if (today - q.returned_date).total_seconds() < q.rental_period:
continue
user = session.query(Users).filter(User.id == q.user_id).one()
to = send_notification_get_to.get(q.notification_method)(user)
body = f"sending email to {user.email}"
send_sms.delay(to, body)
@celery.task()
def notify_users():
session = create_session()
today = datetime.now()
query = session.query(Rentals).filter(Rentals.enabled == True)
total_rentals = query.count()
# each chunk will contain, 100 rows/objects
num_chunks = 100
# find total number of chunks
quo, remainder = divmod(total_rentals, num_chunks)
# each job will contain a certain number of chunks
jobs = quo
if remainder:
jobs = jobs + 1
skip = 0
for i in range(jobs):
notify_range_of_users.delay(num_chunks, skip)
# increment skip to go the next page
skip = skip + 1