集群解决方案中的 MQTT 消费者(akka,vert.x 或?)
MQTT consumer in cluster solution (akka, vert.x or ?)
我们正在启动一个项目,我们需要使用来自 MQTT 主题的一些消息,但我们的解决方案需要在具有多个 nodes/instances 的集群中工作 - 需要可扩展。
问题是,我们不确定要选择什么技术。我们面临这样的问题:换句话说,每个集群只需要 1 个消费者,我们需要避免两次消费消息(如果有 2 个节点)...akka 据报道在集群中支持单例,所以这可能有效,但我不确定如何将 akka 与 MQTT 集成。
另一种可能是vert.x。 akka 和 vert.x 都有到 camel 的桥接或者它们直接支持协议?我看到了那个顶点。有 MQTT 服务器集成,但我不太确定这是否正是我们需要的。它会像我想象的那样将骆驼包装到 vert.x 或 akka-cluster 吗?
据我所知,您应该能够在您的 akka 集群中创建一个单例 akka camel 消费者。你应该这样做:
定义 Akka Camel 消费者后:
object CamelConsumer {
def props = Props(new CamelConsumer())
}
class CamelConsumer extends Consumer {
def endpointUri = "mqtt://..."
def receive = {
//...
}
}
你可以这样定义你的单例:
context.actorOf(ClusterSingletonManager.props(
CamelConsumer.props,
PoisonPill,
ClusterSingletonManagerSettings(context.system)), "singletonConsumer")
val singletonConsumer = context.actorOf(ClusterSingletonProxy.props(
singletonManagerPath = "/user/app/singletonConsumer",
settings = ClusterSingletonProxySettings(system)),
name = "singletonConsumerProxy")
我没有使用 Vert.x 的经验。
可能值得查看 MQTT 共享订阅(一些代理如 MessageSight 或 HiveMQ 支持):http://www.hivemq.com/blog/mqtt-client-load-balancing-with-shared-subscriptions/
共享订阅是一种使用 MQTT 进行客户端负载平衡的机制,它允许使用一次消息,因为订阅在多个客户端之间共享。这可以通过 MQTT 标准机制实现。
您可以使用 Vert.x High Availability and Fail-Over 来确保您的 Verticle 的(单个)实例部署在集群中。
我们正在启动一个项目,我们需要使用来自 MQTT 主题的一些消息,但我们的解决方案需要在具有多个 nodes/instances 的集群中工作 - 需要可扩展。
问题是,我们不确定要选择什么技术。我们面临这样的问题:换句话说,每个集群只需要 1 个消费者,我们需要避免两次消费消息(如果有 2 个节点)...akka 据报道在集群中支持单例,所以这可能有效,但我不确定如何将 akka 与 MQTT 集成。
另一种可能是vert.x。 akka 和 vert.x 都有到 camel 的桥接或者它们直接支持协议?我看到了那个顶点。有 MQTT 服务器集成,但我不太确定这是否正是我们需要的。它会像我想象的那样将骆驼包装到 vert.x 或 akka-cluster 吗?
据我所知,您应该能够在您的 akka 集群中创建一个单例 akka camel 消费者。你应该这样做:
定义 Akka Camel 消费者后:
object CamelConsumer {
def props = Props(new CamelConsumer())
}
class CamelConsumer extends Consumer {
def endpointUri = "mqtt://..."
def receive = {
//...
}
}
你可以这样定义你的单例:
context.actorOf(ClusterSingletonManager.props(
CamelConsumer.props,
PoisonPill,
ClusterSingletonManagerSettings(context.system)), "singletonConsumer")
val singletonConsumer = context.actorOf(ClusterSingletonProxy.props(
singletonManagerPath = "/user/app/singletonConsumer",
settings = ClusterSingletonProxySettings(system)),
name = "singletonConsumerProxy")
我没有使用 Vert.x 的经验。
可能值得查看 MQTT 共享订阅(一些代理如 MessageSight 或 HiveMQ 支持):http://www.hivemq.com/blog/mqtt-client-load-balancing-with-shared-subscriptions/
共享订阅是一种使用 MQTT 进行客户端负载平衡的机制,它允许使用一次消息,因为订阅在多个客户端之间共享。这可以通过 MQTT 标准机制实现。
您可以使用 Vert.x High Availability and Fail-Over 来确保您的 Verticle 的(单个)实例部署在集群中。