客户端连接到 VerneMQ 集群
Client connection to VerneMQ cluster
我们设置了一个 VerneMQ 3 节点集群 (https://vernemq.com/docs/clustering/)。
问题是我没有找到任何关于如何将客户端连接到整个集群的示例。
例如对于单个 VerneMQ 实例,客户端连接 (in python) 是:
client.connect("xxx.xxx.xxx.xxx", 1883, 60)
其中"xxx.xxx.xxx.xxx"为单实例地址
如果是集群,有没有办法指定整个集群的地址,而不是指向单个节点?
没有"specific address of the whole cluster"这样的东西。 (对于 VerneMQ 和任何其他集群服务器软件)。
如果你需要单点联系,你需要在集群前面有一个负载均衡器。然后,您可以使用不同的负载平衡策略。
例如,VerneMQ 与 HAProxy 配合良好,并支持代理协议。
希望对您有所帮助。
我的客户端使用 Paho MQTT client library, I ended up in using the following 客户端 class 的扩展编写:
import random
from paho.mqtt.client import *
class ClusterClient(Client):
"""A subclass of paho.mqtt.Client that supports connecting to a cluster of
mqtt brokers. connect() and connect_async() additionally accept a list
of hostnames or host/port tuples:
connect("host1")
connect(["host1", "host2", "host3"]) # use default port 1883
connect(["host1", ("host2", 8883), ("host3", 8883)])
Hosts to connect to are chosen randomly. If a host disappears the client
automatically connects to another host from the list.
"""
def __init__(self, client_id="", clean_session=True, userdata=None,
protocol=MQTTv311, transport="tcp"):
super().__init__(client_id, clean_session, userdata, protocol, transport)
self._hosts = []
def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
if isinstance(host, (list, tuple)):
self._hosts = [(t, 1883) if isinstance(t, str) else t for t in host]
else:
self._hosts = [(host, port)]
for host, port in self._hosts:
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
host, port = random.choice(self._hosts)
super().connect_async(host, port, keepalive, bind_address)
def reconnect(self):
hosts = self._hosts[:]
random.shuffle(hosts)
while True:
self._host, self._port = hosts.pop(0)
try:
return super().reconnect()
except socket.error:
if not hosts:
raise
我们设置了一个 VerneMQ 3 节点集群 (https://vernemq.com/docs/clustering/)。
问题是我没有找到任何关于如何将客户端连接到整个集群的示例。
例如对于单个 VerneMQ 实例,客户端连接 (in python) 是:
client.connect("xxx.xxx.xxx.xxx", 1883, 60)
其中"xxx.xxx.xxx.xxx"为单实例地址
如果是集群,有没有办法指定整个集群的地址,而不是指向单个节点?
没有"specific address of the whole cluster"这样的东西。 (对于 VerneMQ 和任何其他集群服务器软件)。
如果你需要单点联系,你需要在集群前面有一个负载均衡器。然后,您可以使用不同的负载平衡策略。
例如,VerneMQ 与 HAProxy 配合良好,并支持代理协议。
希望对您有所帮助。
我的客户端使用 Paho MQTT client library, I ended up in using the following 客户端 class 的扩展编写:
import random
from paho.mqtt.client import *
class ClusterClient(Client):
"""A subclass of paho.mqtt.Client that supports connecting to a cluster of
mqtt brokers. connect() and connect_async() additionally accept a list
of hostnames or host/port tuples:
connect("host1")
connect(["host1", "host2", "host3"]) # use default port 1883
connect(["host1", ("host2", 8883), ("host3", 8883)])
Hosts to connect to are chosen randomly. If a host disappears the client
automatically connects to another host from the list.
"""
def __init__(self, client_id="", clean_session=True, userdata=None,
protocol=MQTTv311, transport="tcp"):
super().__init__(client_id, clean_session, userdata, protocol, transport)
self._hosts = []
def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
if isinstance(host, (list, tuple)):
self._hosts = [(t, 1883) if isinstance(t, str) else t for t in host]
else:
self._hosts = [(host, port)]
for host, port in self._hosts:
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
host, port = random.choice(self._hosts)
super().connect_async(host, port, keepalive, bind_address)
def reconnect(self):
hosts = self._hosts[:]
random.shuffle(hosts)
while True:
self._host, self._port = hosts.pop(0)
try:
return super().reconnect()
except socket.error:
if not hosts:
raise