如何可靠地处理队列?
How to reliably process a queue?
完成这个概念上简单的任务:使用队列,为每个条目发送一封电子邮件。
一个简单的方法是:
while true:
entry = queue.pop()
sendMail();
这里的问题是,如果消费者在弹出但 before/during 发送邮件后崩溃,则邮件丢失。所以你把它改成:
while true:
entry = queue.peek()
sendMail();
queue.pop();
但是现在,如果消费者在发送之后崩溃,但在弹出之前,当消费者回来时,邮件将再次发送。
处理此问题的最佳实践方法是什么?
发送电子邮件只是此处的一个示例,可以替代任何关键任务。还假设队列的弹出是已发送邮件的唯一记录,因此邮件子系统本身不记录任何内容。
您可以使用 JMS 队列。它给你交易。正确处理消息后,消息将从队列中删除。
你们要分工负责。您执行的主要操作:
- 选择电子邮件
- 发送邮件
- 成功时从队列中删除电子邮件
- 失败时重新发送电子邮件
让我们将操作分配给负责的对象:
- 挑选 — 给消费者
- 发送 — 给发件人
- 删除 — 结果处理程序
- 重新发送 — 到结果处理程序
所以,基于此,你需要有:一个单独的发件人,只发送邮件并通知操作状态;一个单独的电子邮件消费者,知道该做什么以及从哪里获取数据;操作结果的单独处理程序。
它应该像这样工作:
消费者从队列中挑选一封电子邮件。有没有例外?消费者处理它们。否则进行下一步。
消费者将电子邮件传递给发件人。发件人发送电子邮件。有没有例外?如果发件人无法自行处理异常,则发件人会通知任何已订阅的人。如果没有异常,通知邮件发送成功。
处理程序收到通知。有没有例外?没做什么。如果没有异常,从队列中删除一封电子邮件。如果删除电子邮件导致引发异常,则这是一个错误,应该修复。
消费者选择了另一封电子邮件...
然而,还有另一种选择。处理程序和消费者可以合并在一个实体中,但这种方法会导致明显的障碍。此外,您可以有两个队列:第一个队列应包含需要发送的电子邮件,另一个队列 - 传递给发件人。他们只是通过了,我们不知道他们是否已经发送。和以前一样,发件人应该是一个单独的实体,并且应该通知成功的发送操作。当消费者收到通知时,它会从第一个队列和第二个队列中删除一封电子邮件。如果消费者出现异常,它会查看第二个队列,找到未发送的电子邮件并重新发送它们。
异步交互和单一职责是我们最好的朋友。
我在这里提出两种解决方案。第一个是根据我的经验提出的设计(可以在进一步集思广益后详细说明),第二个是一个简短而快速的解决方案。看一看,琢磨一下,选择适合自己的。
漫漫长路——从零开始创造
如果您计划创建一个容错且高度可用的队列系统,您将必须解决您面临的主要挑战。
如何保证消息不丢失?
了解您的生产者和消费者: 为了设计解决方案,首先我们需要了解我们的生产者和消费者。 单一生产者 - 单一消费者。单一生产者 - 多个消费者。多个生产者 – 多个消费者。
最好的方法是创建一种机制来迎合多个生产者 - 多个消费者,除此之外,还可以配置以迎合这三种情况中的任何一种。
下一题;我们该怎么做? 简单的答案,如果我们能以某种方式创建一个可配置的机制,它能够接收多条消息并将其广播给多个消费者。该机制还具有读取配置、验证消息(可选,您也可以将其添加到消费者中)、短时间存储消息、跟踪确认、将一条消息分解为多条消息、将多条消息聚合为一条消息、具有“处理超时或故障时的“行动计划”并实施该“行动计划”。
详述该机制:让我们将此机制称为Broker。因此,在您的解决方案中,经纪人将按以下方式放置。实线箭头是消息,虚线箭头是确认。
我避免在这里详细介绍代理的设计,因为它会脱离上下文。
处理失败:识别可能的失败点
1.生产者
2.消费者
3.经纪人
4.网络
生产者失败:-如果存在复制,并且备用生产者继续发送消息而不影响功能,吞吐量可能会受到影响,直到原始生产者启动并且运行 再一次。
对于消费者故障和网络故障,代理可以维护一种机制,该机制将保留消息直到确认(为简洁起见,我们称之为确认)是已收到。一旦收到ack,就移除ack对应的消息。
消费者必须以稍微不同的方式处理这种情况。让我们说,消费者在状态中保留以下变量
一种。最后收到的消息
b.消费者状态 =(活动、休眠、重新启动)。
consumer启动的那一刻,它的值可以是(RE-STARTED)。consumer最后收到的消息随着从broker收到的每条消息更新,状态变为ACTIVE。如果消费者尝试向代理发送确认并且连接超时,或者网络存在问题,则状态更改为 DORMANT,并被保留。对于 RE-STARTED 和 DORMANT 两种场景,执行对 Last received 消息的处理是否完成的验证。如果是,它会再次向代理发送确认并等待下一条消息。收到下一条消息的那一刻,状态可以更改为 ACTIVE,并且可以正常开始处理。
另一方面,代理只保留最后发送的消息,直到收到确认。为了克服代理的故障,可以准备一个主从配置,其中复制代理的状态并将消息重定向到另一个代理,以防第一个代理不可用。
简短的解决方案:
使用@Marcin 建议的 JMS。我亲自研究过 RabbitMQ(http://previous.rabbitmq.com/v3_4_x/features.html) and feel that for most of the distributed computing scenarios, this will just work. You can configure high-availability (http://previous.rabbitmq.com/v3_4_x/ha.html),它还带有一个漂亮的用户界面,您可以在其中监控您的队列和消息。
但是,我们鼓励您检查适合您需要的 JMS 系统。
希望对您有所帮助
你的要求看起来不像是要解决二将问题(没有确定性solution/limit)? https://en.wikipedia.org/wiki/Two_Generals%27_Problem
查看 - 处理 - 删除
您只想在确保成功处理后才删除,并确保正确删除。好吧,这些消息中的任何一个都可能 lost/programs 可能在任何步骤崩溃。
最强大的消息队列依赖于一组 acks + 重复尝试(交付)来获得所需的行为(直到 acks 返回)。
但实际上不可能保证在每种情况下都表现完美。您只需要最终权衡可能性,并在重复(至少尝试)处理和 "never"(无限内存等)丢失消息之间做出工程妥协——具体取决于您的实际应用程序需求。这又不是一个新问题 :),而且您不太可能需要为其编写临时代码 - 就像我提到的那样,大多数 MQ 都解决了这个问题。
如果您认真对待此声明:
"Also assume that the popping of the queue is the only record of the mail having been sent,"
那你就不能保证处理的可靠属性。
证明:
假设我有一个保证可靠处理的假设程序 属性。 运行 程序一次,一旦它尝试发送电子邮件,我们 "intervene" 通过导致电子邮件失败并同时终止线程。然后,运行 程序(我想它会重新生成一个新线程)直到程序确定电子邮件是否已发送(这必须是程序中的一个点,否则程序会无限期地 运行s没有取得进展。)现在假设我们记录了程序的操作并在平行宇宙中回放(对随机数生成器的任何调用都应该 return 相同)我们通过使电子邮件成功并杀死同时线程。该程序必须得出与之前相同的结论,这是一个矛盾,因为该程序应该保证可靠的处理 属性 而没有任何关于是否发送电子邮件的记录,并且在其中一个模拟中肯定是发不发邮件搞错了
这是一个使用电子邮件系统的解决方案,该系统会告诉您是否已发送电子邮件,并且仅当您尚未发送电子邮件时才会发送电子邮件。
while true:
task = queue.peek()
if (task_email_sent_already(task)){
//Then we failed after emailing but before pop
goto pop_step;
}
if (task.done){
//Then we failed after doing the task but before sending email
goto email_step;
}
//run_task needs to be written transactionally to set task.done on completion.
//Think transactional memory with persistent logging.
run_task(task);
LABEL email_step
send_email_if_already_not_sent(task);
LABEL pop_step
queue.pop()
如果为同一任务调用两次,send_email_if_already_not_sent 不会发送两封电子邮件,这一点很重要,否则上面的代码可能会导致重复的电子邮件(如果电子邮件成功,但有一些 "lag time" 在 task_email_already_sent return 之前。)
如果您假设从电子邮件成功发送到 task_email_already_sent return 之前可以多长时间为真,即您假设没有电子邮件花费超过 5 秒到 运行,那么你可以只在本地日志中写下你在某个时间 X 发送了一封电子邮件,然后旋转直到时间 X+5 秒,然后才检查 task_email_sent_already。但这当然是有风险的,因为如果某些电子邮件的发送时间超过 5 秒,您可能会发送重复的电子邮件。
生产者的问题是它不知道消息是否已经被完全处理。所以:让消费者确认邮件已经到达,然后才弹出邮件。
还有一个小问题:消息已处理,但无法再发送确认。要解决此问题,请为每封邮件提供一个与邮件一起发送的唯一 ID。然后消费者可以识别重复项(但是,它必须以某种方式保留最新收到的 ID,以便这种方法在崩溃中幸存下来...)。
完成这个概念上简单的任务:使用队列,为每个条目发送一封电子邮件。
一个简单的方法是:
while true:
entry = queue.pop()
sendMail();
这里的问题是,如果消费者在弹出但 before/during 发送邮件后崩溃,则邮件丢失。所以你把它改成:
while true:
entry = queue.peek()
sendMail();
queue.pop();
但是现在,如果消费者在发送之后崩溃,但在弹出之前,当消费者回来时,邮件将再次发送。
处理此问题的最佳实践方法是什么?
发送电子邮件只是此处的一个示例,可以替代任何关键任务。还假设队列的弹出是已发送邮件的唯一记录,因此邮件子系统本身不记录任何内容。
您可以使用 JMS 队列。它给你交易。正确处理消息后,消息将从队列中删除。
你们要分工负责。您执行的主要操作:
- 选择电子邮件
- 发送邮件
- 成功时从队列中删除电子邮件
- 失败时重新发送电子邮件
让我们将操作分配给负责的对象:
- 挑选 — 给消费者
- 发送 — 给发件人
- 删除 — 结果处理程序
- 重新发送 — 到结果处理程序
所以,基于此,你需要有:一个单独的发件人,只发送邮件并通知操作状态;一个单独的电子邮件消费者,知道该做什么以及从哪里获取数据;操作结果的单独处理程序。
它应该像这样工作:
消费者从队列中挑选一封电子邮件。有没有例外?消费者处理它们。否则进行下一步。
消费者将电子邮件传递给发件人。发件人发送电子邮件。有没有例外?如果发件人无法自行处理异常,则发件人会通知任何已订阅的人。如果没有异常,通知邮件发送成功。
处理程序收到通知。有没有例外?没做什么。如果没有异常,从队列中删除一封电子邮件。如果删除电子邮件导致引发异常,则这是一个错误,应该修复。
消费者选择了另一封电子邮件...
然而,还有另一种选择。处理程序和消费者可以合并在一个实体中,但这种方法会导致明显的障碍。此外,您可以有两个队列:第一个队列应包含需要发送的电子邮件,另一个队列 - 传递给发件人。他们只是通过了,我们不知道他们是否已经发送。和以前一样,发件人应该是一个单独的实体,并且应该通知成功的发送操作。当消费者收到通知时,它会从第一个队列和第二个队列中删除一封电子邮件。如果消费者出现异常,它会查看第二个队列,找到未发送的电子邮件并重新发送它们。
异步交互和单一职责是我们最好的朋友。
我在这里提出两种解决方案。第一个是根据我的经验提出的设计(可以在进一步集思广益后详细说明),第二个是一个简短而快速的解决方案。看一看,琢磨一下,选择适合自己的。
漫漫长路——从零开始创造
如果您计划创建一个容错且高度可用的队列系统,您将必须解决您面临的主要挑战。
如何保证消息不丢失?
了解您的生产者和消费者: 为了设计解决方案,首先我们需要了解我们的生产者和消费者。 单一生产者 - 单一消费者。单一生产者 - 多个消费者。多个生产者 – 多个消费者。 最好的方法是创建一种机制来迎合多个生产者 - 多个消费者,除此之外,还可以配置以迎合这三种情况中的任何一种。
下一题;我们该怎么做? 简单的答案,如果我们能以某种方式创建一个可配置的机制,它能够接收多条消息并将其广播给多个消费者。该机制还具有读取配置、验证消息(可选,您也可以将其添加到消费者中)、短时间存储消息、跟踪确认、将一条消息分解为多条消息、将多条消息聚合为一条消息、具有“处理超时或故障时的“行动计划”并实施该“行动计划”。
详述该机制:让我们将此机制称为Broker。因此,在您的解决方案中,经纪人将按以下方式放置。实线箭头是消息,虚线箭头是确认。
我避免在这里详细介绍代理的设计,因为它会脱离上下文。
处理失败:识别可能的失败点 1.生产者 2.消费者 3.经纪人 4.网络
生产者失败:-如果存在复制,并且备用生产者继续发送消息而不影响功能,吞吐量可能会受到影响,直到原始生产者启动并且运行 再一次。
对于消费者故障和网络故障,代理可以维护一种机制,该机制将保留消息直到确认(为简洁起见,我们称之为确认)是已收到。一旦收到ack,就移除ack对应的消息。
消费者必须以稍微不同的方式处理这种情况。让我们说,消费者在状态中保留以下变量 一种。最后收到的消息 b.消费者状态 =(活动、休眠、重新启动)。
consumer启动的那一刻,它的值可以是(RE-STARTED)。consumer最后收到的消息随着从broker收到的每条消息更新,状态变为ACTIVE。如果消费者尝试向代理发送确认并且连接超时,或者网络存在问题,则状态更改为 DORMANT,并被保留。对于 RE-STARTED 和 DORMANT 两种场景,执行对 Last received 消息的处理是否完成的验证。如果是,它会再次向代理发送确认并等待下一条消息。收到下一条消息的那一刻,状态可以更改为 ACTIVE,并且可以正常开始处理。
另一方面,代理只保留最后发送的消息,直到收到确认。为了克服代理的故障,可以准备一个主从配置,其中复制代理的状态并将消息重定向到另一个代理,以防第一个代理不可用。
简短的解决方案:
使用@Marcin 建议的 JMS。我亲自研究过 RabbitMQ(http://previous.rabbitmq.com/v3_4_x/features.html) and feel that for most of the distributed computing scenarios, this will just work. You can configure high-availability (http://previous.rabbitmq.com/v3_4_x/ha.html),它还带有一个漂亮的用户界面,您可以在其中监控您的队列和消息。
但是,我们鼓励您检查适合您需要的 JMS 系统。
希望对您有所帮助
你的要求看起来不像是要解决二将问题(没有确定性solution/limit)? https://en.wikipedia.org/wiki/Two_Generals%27_Problem
查看 - 处理 - 删除
您只想在确保成功处理后才删除,并确保正确删除。好吧,这些消息中的任何一个都可能 lost/programs 可能在任何步骤崩溃。
最强大的消息队列依赖于一组 acks + 重复尝试(交付)来获得所需的行为(直到 acks 返回)。
但实际上不可能保证在每种情况下都表现完美。您只需要最终权衡可能性,并在重复(至少尝试)处理和 "never"(无限内存等)丢失消息之间做出工程妥协——具体取决于您的实际应用程序需求。这又不是一个新问题 :),而且您不太可能需要为其编写临时代码 - 就像我提到的那样,大多数 MQ 都解决了这个问题。
如果您认真对待此声明:
"Also assume that the popping of the queue is the only record of the mail having been sent,"
那你就不能保证处理的可靠属性。
证明: 假设我有一个保证可靠处理的假设程序 属性。 运行 程序一次,一旦它尝试发送电子邮件,我们 "intervene" 通过导致电子邮件失败并同时终止线程。然后,运行 程序(我想它会重新生成一个新线程)直到程序确定电子邮件是否已发送(这必须是程序中的一个点,否则程序会无限期地 运行s没有取得进展。)现在假设我们记录了程序的操作并在平行宇宙中回放(对随机数生成器的任何调用都应该 return 相同)我们通过使电子邮件成功并杀死同时线程。该程序必须得出与之前相同的结论,这是一个矛盾,因为该程序应该保证可靠的处理 属性 而没有任何关于是否发送电子邮件的记录,并且在其中一个模拟中肯定是发不发邮件搞错了
这是一个使用电子邮件系统的解决方案,该系统会告诉您是否已发送电子邮件,并且仅当您尚未发送电子邮件时才会发送电子邮件。
while true:
task = queue.peek()
if (task_email_sent_already(task)){
//Then we failed after emailing but before pop
goto pop_step;
}
if (task.done){
//Then we failed after doing the task but before sending email
goto email_step;
}
//run_task needs to be written transactionally to set task.done on completion.
//Think transactional memory with persistent logging.
run_task(task);
LABEL email_step
send_email_if_already_not_sent(task);
LABEL pop_step
queue.pop()
如果为同一任务调用两次,send_email_if_already_not_sent 不会发送两封电子邮件,这一点很重要,否则上面的代码可能会导致重复的电子邮件(如果电子邮件成功,但有一些 "lag time" 在 task_email_already_sent return 之前。)
如果您假设从电子邮件成功发送到 task_email_already_sent return 之前可以多长时间为真,即您假设没有电子邮件花费超过 5 秒到 运行,那么你可以只在本地日志中写下你在某个时间 X 发送了一封电子邮件,然后旋转直到时间 X+5 秒,然后才检查 task_email_sent_already。但这当然是有风险的,因为如果某些电子邮件的发送时间超过 5 秒,您可能会发送重复的电子邮件。
生产者的问题是它不知道消息是否已经被完全处理。所以:让消费者确认邮件已经到达,然后才弹出邮件。
还有一个小问题:消息已处理,但无法再发送确认。要解决此问题,请为每封邮件提供一个与邮件一起发送的唯一 ID。然后消费者可以识别重复项(但是,它必须以某种方式保留最新收到的 ID,以便这种方法在崩溃中幸存下来...)。