使用 Vertx 处理结果组 - 如何协调?

Processing Groups of Results with Vertx - How to coordinate?

我有一个作业处理系统,其中每个作业都包含数千个需要不同策略才能完成的单独任务。单个任务构成了整个工作。如果所有任务都已完成,则该作业被标记为成功完成并采取其他步骤,如果任何任务失败,则必须将作业标记为失败并采取其他步骤,如果作业超时则必须标记作业失败并采取其他步骤。

收到作业的所有结果后,即可获取下一个作业。当前正在处理作业时不应提取下一个作业。

流程如下所示:

Job Polling Verticle 将作业发布到事件总线,Job Processing Verticle 将每个任务发布到事件总线。当作业策略完成时,它会将任务结果发布到事件总线。

问题是我不知道正确 方法来确定何时在此模型中完成所有任务。所有的 Verticle 都是无状态的,Job Processing Verticle 不等待任何未来,即使 Job Results Verticle 是有状态的,它也不知道它应该期望多少结果。

我能想到的唯一方法是拥有一个全局有状态对象。但是我不认为这是好的设计。

此外,我需要知道作业何时超时。也就是说,它 运行 比应该的长,我需要考虑它失败,记录它,然后继续。

我可以用全局状态来做到这一点,但我还是认为这不是正确的解决方案。

这个垂直模式对我正在尝试做的事情有意义吗?

首先,让我尝试回答您的问题。那我就试着解释一下这个设计有什么问题吧

The issue is that I don't know the right way to determine when all tasks have been completed in this model. All verticles are stateless, The Job Processing Verticle doesn't await any futures, and even if the Job Results Verticle was stateful, it doesn't know how many results it should expect.

解决方案可以是引用计数verticle。每个 worker 应该在事件总线上发出一个 start message 并在它开始时使用 jobId,并在它完成时使用 end messagejobId。即使你有扇出(在这种情况下你不知道有多少工人),计算 Verticle 也会知道。在您的图表中,"Job Post Processing Verticle" 是一个很好的选择。它可以维护一个计数器,只有当它归零时,它才应该开始下一个工作。这也有助于避免实际共享一些内存引用。

Additionally, I need to know when a Job has timed out. That is, it's run longer than it should and I need to consider it's failed, log it, and move on.

在同一个 Verticle 中,您可以在每次获得新的 start message 时启动一个计时器。如果得到 end message,取消计时器。否则,取消当前作业并重新开始。

现在,这个解决方案可行,但设计有两个主要缺陷。一个是您似乎将所有流程都保存在内存中。如果您的应用程序崩溃,所有进度都会丢失,并且不清楚您是如何记录它的。也许在数据库中轮询 Jobs table 实际上会更好,因为无论如何你的作业执行都是顺序的。

第二点是所有这些超时和引用计数都是结构化并发的自制实现。也许您应该为此看一下 Kotlin 协程之类的东西,它会为您处理许多问题。