顺序数据的并行性和故障转移

Parallelism and Failover of a Sequential Data

大家好!

我们有一个非常简单的应用程序适配器:它每 30 秒从一个系统的数据库中读取一次记录(不能写入),将这些记录中的每一个转换为内部格式,执行过滤、浓缩, ..., 最后,将生成的实体转换为 xml 格式,并通过 JMS 将它们发送到其他系统。没什么新鲜的。

让我们在这里添加一些香料:数据库中的记录是顺序(这意味着它们的标识是由序列生成的),当需要读取一组新记录时,我们会得到一个最后处理的序列号——它存储在我们的内部数据库中,并在每次处理下一条记录(发送到 JMS)时更新——和从该记录开始读取 (+1).

问题是我们的客户给了我们一个 NFR:读取记录束的处理时间不得超过 30 秒。至于工作流程中有很多步骤(有些步骤很长运行),并且有可能得到相当多的记录,就我们一个一个地处理它们而言,它可能需要超过 30 秒。

由于以上所有我想问2个问题:

1) 是否有一种并行处理顺序数据的方法,可能使用一个或多个中间存储,或 Disruptor 模式,或类似 cqrs,或基于通知,或...提供了一种可能性在这样的系统中工作?

2) 一般的。我需要存储一个最后处理的数字并将一个实体发送到 JMS。如果我将一个数字保存到数据库,然后 JMS 出现一些问题,在应用程序重新启动时我的适配器会认为它成功发送了实体,这是不正确的并且永远不会收到。如果我发送一个实体,然后尝试将一个数字保存到数据库并出现异常,则在应用程序重新启动时将执行重新处理,这将导致 JMS 中的重复。我不确定 xa 交易是否会在这里有所帮助或某种最后的资源策略...

有人可以分享经验或想法吗?

提前致谢!

1) 30 秒是一段很长的时间,在那段时间你可以做很多事情,尤其是不止一个 CPU。没有具体细节,我只能说,如果您对它进行分析并使用更多 CPUs.

,您可能会使其更快

2) 您可以在发送之前更新数据库并自己监听 JMS 队列以查看它是否被代理接收。

Dimitry - 我不知道你的问题的细节,所以我只是想做一组假设。我希望它会引发一个想法,至少会导致解决方案。

这里是:

  1. 获取要处理的项目列表。
  2. 存储最后一个 ID(可能还有起始 ID)
  3. 在不同的线程上处理每个项目(建议使用任务)。
  4. 在本地失败队列中记录任何失败的项目。
  5. 当你抓住下一批时,确保你首先处理失败的队列。
  6. 有确定最大重试次数的方法和moving/marking永久失败的方法。

不确定这是否是您想要的。 NServiceBus 有一个重试过程,每次重试之间的间隔变长到一个点,然后它被标记为失败。

伙计们,我们最终得出了以下解决方案。我们实现了一种 Actor Model。这个想法如下。

我们的应用程序有两个主要(内部)数据库 table,我们称它们为 READ_DATA_INFO,其中包含 'source' 外部数据库的最后读取记录号系统和 DUMPED_DATA,它存储有关源系统的每个读取记录的元数据。这就是它的工作原理:每 n(可配置 属性)秒,服务总线读取源系统最后处理的标识符,并向源系统发送请求以从中获取新记录。如果有几条新记录,它们将用 DumpRecordBunchMessage 消息包装并发送到 DumpActor class。此 class 开始一个包含两个操作的事务:更新最后读取的记录号(READ_DATA_INFO table)并保存有关每个记录的元数据(DUMPED_DATA table)(每个转储记录都获得 'NEW' 状态。成功处理记录后,它获得 'COMPLETED' 状态;否则 - 'FAILED' 状态)。在事务提交成功的情况下,这些记录中的每一个都用 RecordMessage 消息包装 class 并发送给下一个处理参与者;否则这些记录将被跳过——它们将在接下来的 n 秒后被重新读取。

有趣的三点:

  • 应用程序的灾难恢复。如果我们的应用程序将在处理过程中以某种方式停止怎么办。没问题,在应用程序启动时(@PostConstruct 标记的方法)我们在 DUMPED_DATA table 找到所有状态为 'NEW' 的记录,并借助存储的元数据重建从中恢复它们源系统。

  • 并行处理。所有的记录都转储成功后,就变得独立了,也就是说可以并行处理。我们介绍了几种并行机制和负载均衡机制。最简单的一种是循环方法。每个处理角色都由一个父角色(负载均衡器)和一组可配置的子角色(工作者)组成。当新消息到达父 actor 的队列时,它会将其分派给下一个 worker。

  • 防止重复记录。这是最有趣的一个。假设我们每 5 秒读取一次数据。如果有一个 actor 有一个长 运行 操作,则可能会从同一个 last-read-record 编号开始多次尝试从源系统的数据库中读取。因此,可能会转储和处理大量重复记录。为了防止这种情况,我们对 DumpActor 的消息添加了类似 CAS 的检查:如果消息的最后读取记录等于 DUMPED_DATA table 中的记录,则应处理该消息(之前没有处理任何消息);否则这条消息被拒绝。相当简单,但功能强大。

我希望这篇概述对某些人有所帮助。玩得开心!