我应该如何避免使用 mailgun、taskqueue 和 ndb 发送重复的电子邮件?
How should I avoid sending duplicate emails using mailgun, taskqueue and ndb?
我正在使用任务队列 API 发送多封电子邮件是使用 mailgun 的小组。我的代码看起来大致像这样:
class CpMsg(ndb.Model):
group = ndb.KeyProperty()
sent = ndb.BooleanProperty()
#Other properties
def send_mail(messages):
"""Sends a request to mailgun's API"""
# Some code
pass
class MailTask(TaskHandler):
def post(self):
p_key = utils.key_from_string(self.request.get('p'))
msgs = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE)
if msgs:
send_mail(msgs)
for msg in msgs:
msg.sent = True
ndb.put_multi(msgs)
#Call the task again in COOLDOWN seconds
上面的代码一直运行良好,但根据文档,任务队列 API 保证任务交付 至少一次,所以任务应该是幂等的。现在,大多数情况下,上述代码就是这种情况,因为它只获取 'sent' 属性 等于 False 的消息。问题是非祖先 ndb 查询只是最终一致的,这意味着如果任务快速连续执行两次,查询可能 return 过时的结果并包括刚刚发送的消息。
我考虑过为消息包含一个祖先,但由于发送的电子邮件将达到数千封,我担心这可能意味着拥有大型实体组,写入吞吐量有限。
我应该使用祖先来进行查询吗?或者也许有一种方法可以配置 mailgun 以避免两次发送相同的电子邮件?我是否应该接受在极少数情况下可能会多次发送几封电子邮件的风险?
避免最终一致性障碍的一种可能方法是使查询成为 keys_only
查询,然后遍历消息键以通过键查找(强一致性)获取实际消息,检查是否 msg.sent
为 True 并在这种情况下跳过发送这些消息。沿着这些线的东西:
msg_keys = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE, keys_only=True)
if not msg_keys:
return
msgs = ndb.get_multi(msg_keys)
msgs_to_send = []
for msg in msgs:
if not msg.sent:
msgs_to_send.append(msg)
if msgs_to_send:
send_mail(msgs_to_send)
for msg in msgs_to_send:
msg.sent = True
ndb.put_multi(msgs_to_send)
您还必须使 post
调用具有事务性(使用 @ndb.transactional()
装饰器)。
这应该解决由查询最终一致性引起的重复问题。但是,由于数据存储争用(或任何其他原因)导致的事务重试仍然存在重复空间 - 因为 send_mail()
调用不是幂等的。一次发送一条消息(可能使用任务队列)可以减少这种情况发生的可能性。另见
我正在使用任务队列 API 发送多封电子邮件是使用 mailgun 的小组。我的代码看起来大致像这样:
class CpMsg(ndb.Model):
group = ndb.KeyProperty()
sent = ndb.BooleanProperty()
#Other properties
def send_mail(messages):
"""Sends a request to mailgun's API"""
# Some code
pass
class MailTask(TaskHandler):
def post(self):
p_key = utils.key_from_string(self.request.get('p'))
msgs = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE)
if msgs:
send_mail(msgs)
for msg in msgs:
msg.sent = True
ndb.put_multi(msgs)
#Call the task again in COOLDOWN seconds
上面的代码一直运行良好,但根据文档,任务队列 API 保证任务交付 至少一次,所以任务应该是幂等的。现在,大多数情况下,上述代码就是这种情况,因为它只获取 'sent' 属性 等于 False 的消息。问题是非祖先 ndb 查询只是最终一致的,这意味着如果任务快速连续执行两次,查询可能 return 过时的结果并包括刚刚发送的消息。
我考虑过为消息包含一个祖先,但由于发送的电子邮件将达到数千封,我担心这可能意味着拥有大型实体组,写入吞吐量有限。
我应该使用祖先来进行查询吗?或者也许有一种方法可以配置 mailgun 以避免两次发送相同的电子邮件?我是否应该接受在极少数情况下可能会多次发送几封电子邮件的风险?
避免最终一致性障碍的一种可能方法是使查询成为 keys_only
查询,然后遍历消息键以通过键查找(强一致性)获取实际消息,检查是否 msg.sent
为 True 并在这种情况下跳过发送这些消息。沿着这些线的东西:
msg_keys = CpMsg.query(
CpMsg.group==p_key,
CpMsg.sent==False).fetch(BATCH_SIZE, keys_only=True)
if not msg_keys:
return
msgs = ndb.get_multi(msg_keys)
msgs_to_send = []
for msg in msgs:
if not msg.sent:
msgs_to_send.append(msg)
if msgs_to_send:
send_mail(msgs_to_send)
for msg in msgs_to_send:
msg.sent = True
ndb.put_multi(msgs_to_send)
您还必须使 post
调用具有事务性(使用 @ndb.transactional()
装饰器)。
这应该解决由查询最终一致性引起的重复问题。但是,由于数据存储争用(或任何其他原因)导致的事务重试仍然存在重复空间 - 因为 send_mail()
调用不是幂等的。一次发送一条消息(可能使用任务队列)可以减少这种情况发生的可能性。另见