将存储在 table 中的工作分配给多个进程
Distribute work stored in table to multiple processes
我有一个数据库 table,其中每一行代表一项要完成的工作。本次table 补up/receive 休息API。除了负责这项工作的休息服务外,我还有另一项使用演员来处理这项工作的服务。
我需要有关将这项工作平均分配给这些工人的建议。这项工作不是一次性的,它是每隔一段时间完成一次,直到用户删除它。
因此我需要一种机制
工作分配均匀。
如果第二个服务(工作消费者)失败,它可以使用 table 中的所有记录再次启动并再次分发工作。
解决方案的大纲是使用 Akka Cluster、Cluster Sharding 和 Akk Cluster Singleton。当集群被认为形成时(通常当一些最小数量的成员加入集群时),你启动集群分片系统(通过数据库的主键分片工作项)然后集群单例将读取数据库 table并将工作项发送到 Cluster Sharding,以便在集群的节点之间分发。 Akka Streams,尤其是 Alpakka 的 Slick JDBC 集成可能在单例中很有用。另一个定期检查作业的集群单例也可能有助于从集群节点故障中恢复(但请参阅下面的内容以了解那里的情况)。
两个注意事项:
如果使用 Cluster Sharding 和 Cluster Singleton,您可能需要考虑在裂脑情况下会发生什么:这是一个分布式系统,可以推测最终发生裂脑的概率为 100%。在裂脑场景中,您很可能会在分裂的不同端同时执行相同的工作,因此您需要询问在您的用例中这是否是 acceptable。
如果没有,那么您将需要一个组件来监视集群中节点之间的通信以检测裂脑并采取措施解决该情况:Lightbend 的 Split Brain Resolver 是一个不错的选择,如果您不是'有兴趣自己实现这个。
与此相关,如果作业包含许多必须执行的步骤,那么要问的问题是,如果集群或节点在完成后失败,比如说,十个步骤中的八个,是它接受 table 重做步骤 1-8 与从步骤 9 开始?如果对此的回答是“否”,那么您将需要保留作业的中间状态。 Akka Persistence 在这里是一个不错的选择,尽管您可能想阅读有关事件溯源的内容。如果将 Persistence 与 Cluster Sharding 和 Cluster Singleton 一起使用,应该注意,您几乎肯定需要处理裂脑(请参阅上一项)。
- 每个演员代表作品的一行table。
class WorkActor(workId: String)(implicit system: ActorSystem, materializer: ActorMaterializer) extends Actor {
// read the record from table or whereever you want to read
override def preStart(): Unit = {
logger.info("WorkActor start ===> " + self)
}
override def receive: Receive = {
case _ => {}
}
}
- 创建一个 Akka 集群分片区域以将来自 rest api 的请求分派给相应的 actor。调用 startShardingRegion 函数以 return 一个 actorRef。然后你可以通过restAPI把消息发给这个sharding actorRef,然后相应的会帮你处理这个消息。
final case class CommandEnvelope(id: String, payload: Any)
def startShardingRegion(role: String)(implicit system: ActorSystem) = {
ClusterSharding(system).start(
typeName = role,
entityProps = Props(classOf[WorkActor]),
settings = ClusterShardingSettings(system),
extractEntityId = ClusterConfig.extractEntityId,
extractShardId = ClusterConfig.extractShardId
)
}
// sharding key
object ClusterConfig {
private val numberOfShards = 100
val extractEntityId: ShardRegion.ExtractEntityId = {
case CommandEnvelope(id, payload) => (id, payload)
}
val extractShardId: ShardRegion.ExtractShardId = {
case CommandEnvelope(id, _) => (id.hashCode % numberOfShards).toString
case ShardRegion.StartEntity(id) => (id.hashCode % numberOfShards).toString
}
}
从 actor 中的 preStart 函数中读取或恢复数据。有很多选择。 MQ(Kafka)、Akka持久化(RDS、Cassandra)等未完之作可以阅读
SBR 有开源解决方案。如果您的业务逻辑可行,那将是一个高级主题。
https://github.com/TanUkkii007/akka-cluster-custom-downing
我有一个数据库 table,其中每一行代表一项要完成的工作。本次table 补up/receive 休息API。除了负责这项工作的休息服务外,我还有另一项使用演员来处理这项工作的服务。
我需要有关将这项工作平均分配给这些工人的建议。这项工作不是一次性的,它是每隔一段时间完成一次,直到用户删除它。
因此我需要一种机制
工作分配均匀。
如果第二个服务(工作消费者)失败,它可以使用 table 中的所有记录再次启动并再次分发工作。
解决方案的大纲是使用 Akka Cluster、Cluster Sharding 和 Akk Cluster Singleton。当集群被认为形成时(通常当一些最小数量的成员加入集群时),你启动集群分片系统(通过数据库的主键分片工作项)然后集群单例将读取数据库 table并将工作项发送到 Cluster Sharding,以便在集群的节点之间分发。 Akka Streams,尤其是 Alpakka 的 Slick JDBC 集成可能在单例中很有用。另一个定期检查作业的集群单例也可能有助于从集群节点故障中恢复(但请参阅下面的内容以了解那里的情况)。
两个注意事项:
如果使用 Cluster Sharding 和 Cluster Singleton,您可能需要考虑在裂脑情况下会发生什么:这是一个分布式系统,可以推测最终发生裂脑的概率为 100%。在裂脑场景中,您很可能会在分裂的不同端同时执行相同的工作,因此您需要询问在您的用例中这是否是 acceptable。
如果没有,那么您将需要一个组件来监视集群中节点之间的通信以检测裂脑并采取措施解决该情况:Lightbend 的 Split Brain Resolver 是一个不错的选择,如果您不是'有兴趣自己实现这个。
与此相关,如果作业包含许多必须执行的步骤,那么要问的问题是,如果集群或节点在完成后失败,比如说,十个步骤中的八个,是它接受 table 重做步骤 1-8 与从步骤 9 开始?如果对此的回答是“否”,那么您将需要保留作业的中间状态。 Akka Persistence 在这里是一个不错的选择,尽管您可能想阅读有关事件溯源的内容。如果将 Persistence 与 Cluster Sharding 和 Cluster Singleton 一起使用,应该注意,您几乎肯定需要处理裂脑(请参阅上一项)。
- 每个演员代表作品的一行table。
class WorkActor(workId: String)(implicit system: ActorSystem, materializer: ActorMaterializer) extends Actor {
// read the record from table or whereever you want to read
override def preStart(): Unit = {
logger.info("WorkActor start ===> " + self)
}
override def receive: Receive = {
case _ => {}
}
}
- 创建一个 Akka 集群分片区域以将来自 rest api 的请求分派给相应的 actor。调用 startShardingRegion 函数以 return 一个 actorRef。然后你可以通过restAPI把消息发给这个sharding actorRef,然后相应的会帮你处理这个消息。
final case class CommandEnvelope(id: String, payload: Any)
def startShardingRegion(role: String)(implicit system: ActorSystem) = {
ClusterSharding(system).start(
typeName = role,
entityProps = Props(classOf[WorkActor]),
settings = ClusterShardingSettings(system),
extractEntityId = ClusterConfig.extractEntityId,
extractShardId = ClusterConfig.extractShardId
)
}
// sharding key
object ClusterConfig {
private val numberOfShards = 100
val extractEntityId: ShardRegion.ExtractEntityId = {
case CommandEnvelope(id, payload) => (id, payload)
}
val extractShardId: ShardRegion.ExtractShardId = {
case CommandEnvelope(id, _) => (id.hashCode % numberOfShards).toString
case ShardRegion.StartEntity(id) => (id.hashCode % numberOfShards).toString
}
}
从 actor 中的 preStart 函数中读取或恢复数据。有很多选择。 MQ(Kafka)、Akka持久化(RDS、Cassandra)等未完之作可以阅读
SBR 有开源解决方案。如果您的业务逻辑可行,那将是一个高级主题。
https://github.com/TanUkkii007/akka-cluster-custom-downing