Paho客户端限制?
Paho client limit?
我正在 运行 使用 mosquitto 代理和客户端以及 paho 客户端对 service/client 服务进行一些性能测试。我得到了一些奇怪的结果:
部署说明:
- 3台机器;生产者、经纪人、消费者
- 制作人:6 python 个脚本尽可能快地使用 mosquitto_pub。见下文。
- 消费者:简单 java 客户端显示如下。正在订阅所有主题。
- 硬件细节没有显示出显着差异。
1) Mosquitto 收到大约 1459.5055 messages/s 但它只发送 973.9596666666667。订户只得到 485.5458333333333 。
2) 无论创建多少 paho 客户端实例,性能都不会提高。例如。如果你 运行 在一个主题中有 6 个生产者,在两个主题中有 2 个消费者,你将得到 485.5458333333333。但是,如果您将 6 个生产者添加到另一个主题(已经检查消息总量是否增加),则总性能保持不变并且每个主题除以二。
3) 如果您对两个分开的 java 应用程序进行精确测试,性能不会下降。每个应用程序都获得最大性能。
在任何情况下 CPU 或内存都不会达到任何限制。
Producers.py
from datetime import datetime, date, time
import os,sys,time, json, random, itertools
arg = sys.argv
host="broker"
n=1
if len(arg)>1:
n = int(arg[1])
i=0
while True :
payload = {"id":str(n),"Time":datetime.now().strftime("%Y-%m-%dT%H:%M:%S.00Z"),"ResultValue":1.0,"ResultType":"integer","Datastream":{"id":str(n)}}
os.system( "mosquitto_pub -h "+host+" -t "+("/"+str(payload["id"])+" -m " +str(json.dumps(json.dumps(payload)))+"")
Consumer.java
package eu.linksmart.testing;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class Application implements MqttCallback {
public Application() {
id++;
}
public static void main(String[] args) {
try {
Application app = new Application();
create("1",new Application());
create("2",new Application());
while (true)
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
static void create(String id, Application app) throws MqttException {
MqttClient mqttClient = new MqttClient("tcp://broker:1883", UUID.randomUUID().toString(), new MemoryPersistence());
mqttClient.connect();
mqttClient.subscribe("/"+id+"/#", 1);
mqttClient.setCallback(app);
}
long acc =0;
int i=0;
long start= System.nanoTime();
static int id=0;
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
i++;
acc = (System.nanoTime()-start);
if(acc/1000000>1000){
start = System.nanoTime();
System.out.println(String.valueOf((i * 1000000000.0) / acc));
acc =0;
i=0;
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
例如运行话题 1 的制作人为:
python Producers.py 1&
什么限制了 java 应用程序中的 paho 客户端?
经过大量调试,我发现了问题所在。
主题 $SYS/broker/load/messages/received/1min
在我发送时报告了更多消息。可能将协议消息计为消息。因此,在闲置时,该主题正在与一位订阅者一起报告 3.22。所以我以为我发送的是 1459.5055 per/sec,这是由 mosquitto 报告的。但我只发送了 485.5458333333333。
所以不要相信这个主题的应用程序消息负载!
我正在 运行 使用 mosquitto 代理和客户端以及 paho 客户端对 service/client 服务进行一些性能测试。我得到了一些奇怪的结果:
部署说明:
- 3台机器;生产者、经纪人、消费者
- 制作人:6 python 个脚本尽可能快地使用 mosquitto_pub。见下文。
- 消费者:简单 java 客户端显示如下。正在订阅所有主题。
- 硬件细节没有显示出显着差异。
1) Mosquitto 收到大约 1459.5055 messages/s 但它只发送 973.9596666666667。订户只得到 485.5458333333333 。
2) 无论创建多少 paho 客户端实例,性能都不会提高。例如。如果你 运行 在一个主题中有 6 个生产者,在两个主题中有 2 个消费者,你将得到 485.5458333333333。但是,如果您将 6 个生产者添加到另一个主题(已经检查消息总量是否增加),则总性能保持不变并且每个主题除以二。
3) 如果您对两个分开的 java 应用程序进行精确测试,性能不会下降。每个应用程序都获得最大性能。
在任何情况下 CPU 或内存都不会达到任何限制。
Producers.py
from datetime import datetime, date, time
import os,sys,time, json, random, itertools
arg = sys.argv
host="broker"
n=1
if len(arg)>1:
n = int(arg[1])
i=0
while True :
payload = {"id":str(n),"Time":datetime.now().strftime("%Y-%m-%dT%H:%M:%S.00Z"),"ResultValue":1.0,"ResultType":"integer","Datastream":{"id":str(n)}}
os.system( "mosquitto_pub -h "+host+" -t "+("/"+str(payload["id"])+" -m " +str(json.dumps(json.dumps(payload)))+"")
Consumer.java
package eu.linksmart.testing;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class Application implements MqttCallback {
public Application() {
id++;
}
public static void main(String[] args) {
try {
Application app = new Application();
create("1",new Application());
create("2",new Application());
while (true)
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
static void create(String id, Application app) throws MqttException {
MqttClient mqttClient = new MqttClient("tcp://broker:1883", UUID.randomUUID().toString(), new MemoryPersistence());
mqttClient.connect();
mqttClient.subscribe("/"+id+"/#", 1);
mqttClient.setCallback(app);
}
long acc =0;
int i=0;
long start= System.nanoTime();
static int id=0;
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
i++;
acc = (System.nanoTime()-start);
if(acc/1000000>1000){
start = System.nanoTime();
System.out.println(String.valueOf((i * 1000000000.0) / acc));
acc =0;
i=0;
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
例如运行话题 1 的制作人为:
python Producers.py 1&
什么限制了 java 应用程序中的 paho 客户端?
经过大量调试,我发现了问题所在。
主题 $SYS/broker/load/messages/received/1min
在我发送时报告了更多消息。可能将协议消息计为消息。因此,在闲置时,该主题正在与一位订阅者一起报告 3.22。所以我以为我发送的是 1459.5055 per/sec,这是由 mosquitto 报告的。但我只发送了 485.5458333333333。
所以不要相信这个主题的应用程序消息负载!