Vert.x 使用 -cluster 选项启动时性能下降
Vert.x performance drop when starting with -cluster option
我想知道是否有人遇到过同样的问题。
我们有一个 Vert.x 应用程序,最终它的目的是将 6 亿行插入到 Cassandra 集群中。我们正在通过少量测试来测试 Vert.x 结合 Cassandra 的速度。
如果我们 运行 没有 -cluster 选项的 fat jar(使用 Shade 插件构建),我们可以在大约一分钟内插入 1000 万条记录。当我们添加 -cluster 选项时(最终我们将 运行 集群中的 Vert.x 应用程序)插入 1000 万条记录大约需要 5 分钟。
有人知道为什么吗?
我们知道 Hazelcast 配置会产生一些开销,但从未想过它会慢 5 倍。这意味着我们将需要在集群中有 5 个 EC2 实例才能在没有集群选项的情况下使用 1 个 EC2 获得相同的结果。
如前所述,EC2 实例上的所有内容 运行:
- t2.small
上的 2 个 Cassandra 服务器
- 1 Vert.x t2.2xlarge 上的服务器
只是添加项目的代码。我想这会有所帮助。
发件人垂直:
public class ProviderVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
IntStream.range(1, 30000001).parallel().forEach(i -> {
vertx.eventBus().send("clustertest1", Json.encode(new TestCluster1(i, "abc", LocalDateTime.now())));
});
}
@Override
public void stop() throws Exception {
super.stop();
}
}
和插入器verticle
public class ReceiverVerticle extends AbstractVerticle {
private int messagesReceived = 1;
private Session cassandraSession;
@Override
public void start() throws Exception {
PoolingOptions poolingOptions = new PoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL, 2)
.setMaxConnectionsPerHost(HostDistance.LOCAL, 3)
.setCoreConnectionsPerHost(HostDistance.REMOTE, 1)
.setMaxConnectionsPerHost(HostDistance.REMOTE, 3)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 20)
.setMaxQueueSize(32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 20);
Cluster cluster = Cluster.builder()
.withPoolingOptions(poolingOptions)
.addContactPoints(ClusterSetup.SEEDS)
.build();
System.out.println("Connecting session");
cassandraSession = cluster.connect("kiespees");
System.out.println("Session connected:\n\tcluster [" + cassandraSession.getCluster().getClusterName() + "]");
System.out.println("Connected hosts: ");
cassandraSession.getState().getConnectedHosts().forEach(host -> System.out.println(host.getAddress()));
PreparedStatement prepared = cassandraSession.prepare(
"insert into clustertest1 (id, value, created) " +
"values (:id, :value, :created)");
PreparedStatement preparedTimer = cassandraSession.prepare(
"insert into timer (name, created_on, amount) " +
"values (:name, :createdOn, :amount)");
BoundStatement timerStart = preparedTimer.bind()
.setString("name", "clusterteststart")
.setInt("amount", 0)
.setTimestamp("createdOn", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(timerStart);
EventBus bus = vertx.eventBus();
System.out.println("Bus info: " + bus.toString());
MessageConsumer<String> cons = bus.consumer("clustertest1");
System.out.println("Consumer info: " + cons.address());
System.out.println("Waiting for messages");
cons.handler(message -> {
TestCluster1 tc = Json.decodeValue(message.body(), TestCluster1.class);
if (messagesReceived % 100000 == 0)
System.out.println("Message received: " + messagesReceived);
BoundStatement boundRecord = prepared.bind()
.setInt("id", tc.getId())
.setString("value", tc.getValue())
.setTimestamp("created", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(boundRecord);
if (messagesReceived % 100000 == 0) {
BoundStatement timerStop = preparedTimer.bind()
.setString("name", "clusterteststop")
.setInt("amount", messagesReceived)
.setTimestamp("createdOn", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(timerStop);
}
messagesReceived++;
//message.reply("OK");
});
}
@Override
public void stop() throws Exception {
super.stop();
cassandraSession.close();
}
}
当您为应用程序启用(任何类型的)集群时,您使应用程序更能应对故障,但同时也增加了性能损失。
例如,您当前的流程(没有聚类)类似于:
client ->
vert.x app ->
in memory same process eventbus (negletible) ->
handler -> cassandra
<- vert.x app
<- client
启用集群后:
client ->
vert.x app ->
serialize request ->
network request cluster member ->
deserialize request ->
handler -> cassandra
<- serialize response
<- network reply
<- deserialize response
<- vert.x app
<- client
如您所见,需要进行许多编码解码操作以及多次网络调用,所有这些都会添加到您的总请求时间中。
为了获得最佳性能,您需要利用局部性,离数据存储越近通常速度越快。
您实际上 运行 陷入了 Vert.x Hazelcast 集群管理器的极端情况。
首先,您正在使用一个 worker Verticle 来发送您的消息 (30000001)。 Hazelcast 在后台是阻塞的,因此当您从工作人员发送消息时,版本 3.3.3 不会考虑到这一点。最近我们添加了此修复程序 https://github.com/vert-x3/issues/issues/75(未出现在 3.4.0.Beta1 中,但出现在 3.4.0-SNAPSHOTS 中)将改善这种情况。
其次,当您同时发送所有消息时,它会遇到另一个极端情况,阻止 Hazelcast 集群管理器使用集群拓扑的缓存。此拓扑缓存通常在发送第一条消息后更新,并且一次发送所有消息会阻止使用缓存(简短说明 HazelcastAsyncMultiMap#getInProgressCount 将 > 0 并阻止使用缓存),因此要付出代价昂贵的查找(因此缓存)。
如果我将 Bertjan 的复制器与 3.4.0-SNAPSHOT + Hazelcast 一起使用并进行以下更改:将消息发送到目的地,等待回复。回复后发送所有消息然后我得到了很多改进。
没有聚类:5852 毫秒
使用 HZ 3.3.3 集群:16745 毫秒
使用 HZ 3.4.0-SNAPSHOT + 初始消息进行集群:8609 ms
我也认为您不应该使用 worker Verticle 发送那么多消息,而是使用事件循环 Verticle 批量发送它们。也许你应该解释你的用例,我们可以考虑解决它的最佳方法。
我想知道是否有人遇到过同样的问题。
我们有一个 Vert.x 应用程序,最终它的目的是将 6 亿行插入到 Cassandra 集群中。我们正在通过少量测试来测试 Vert.x 结合 Cassandra 的速度。
如果我们 运行 没有 -cluster 选项的 fat jar(使用 Shade 插件构建),我们可以在大约一分钟内插入 1000 万条记录。当我们添加 -cluster 选项时(最终我们将 运行 集群中的 Vert.x 应用程序)插入 1000 万条记录大约需要 5 分钟。
有人知道为什么吗?
我们知道 Hazelcast 配置会产生一些开销,但从未想过它会慢 5 倍。这意味着我们将需要在集群中有 5 个 EC2 实例才能在没有集群选项的情况下使用 1 个 EC2 获得相同的结果。
如前所述,EC2 实例上的所有内容 运行:
- t2.small 上的 2 个 Cassandra 服务器
- 1 Vert.x t2.2xlarge 上的服务器
只是添加项目的代码。我想这会有所帮助。
发件人垂直:
public class ProviderVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
IntStream.range(1, 30000001).parallel().forEach(i -> {
vertx.eventBus().send("clustertest1", Json.encode(new TestCluster1(i, "abc", LocalDateTime.now())));
});
}
@Override
public void stop() throws Exception {
super.stop();
}
}
和插入器verticle
public class ReceiverVerticle extends AbstractVerticle {
private int messagesReceived = 1;
private Session cassandraSession;
@Override
public void start() throws Exception {
PoolingOptions poolingOptions = new PoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL, 2)
.setMaxConnectionsPerHost(HostDistance.LOCAL, 3)
.setCoreConnectionsPerHost(HostDistance.REMOTE, 1)
.setMaxConnectionsPerHost(HostDistance.REMOTE, 3)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 20)
.setMaxQueueSize(32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 20);
Cluster cluster = Cluster.builder()
.withPoolingOptions(poolingOptions)
.addContactPoints(ClusterSetup.SEEDS)
.build();
System.out.println("Connecting session");
cassandraSession = cluster.connect("kiespees");
System.out.println("Session connected:\n\tcluster [" + cassandraSession.getCluster().getClusterName() + "]");
System.out.println("Connected hosts: ");
cassandraSession.getState().getConnectedHosts().forEach(host -> System.out.println(host.getAddress()));
PreparedStatement prepared = cassandraSession.prepare(
"insert into clustertest1 (id, value, created) " +
"values (:id, :value, :created)");
PreparedStatement preparedTimer = cassandraSession.prepare(
"insert into timer (name, created_on, amount) " +
"values (:name, :createdOn, :amount)");
BoundStatement timerStart = preparedTimer.bind()
.setString("name", "clusterteststart")
.setInt("amount", 0)
.setTimestamp("createdOn", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(timerStart);
EventBus bus = vertx.eventBus();
System.out.println("Bus info: " + bus.toString());
MessageConsumer<String> cons = bus.consumer("clustertest1");
System.out.println("Consumer info: " + cons.address());
System.out.println("Waiting for messages");
cons.handler(message -> {
TestCluster1 tc = Json.decodeValue(message.body(), TestCluster1.class);
if (messagesReceived % 100000 == 0)
System.out.println("Message received: " + messagesReceived);
BoundStatement boundRecord = prepared.bind()
.setInt("id", tc.getId())
.setString("value", tc.getValue())
.setTimestamp("created", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(boundRecord);
if (messagesReceived % 100000 == 0) {
BoundStatement timerStop = preparedTimer.bind()
.setString("name", "clusterteststop")
.setInt("amount", messagesReceived)
.setTimestamp("createdOn", new Timestamp(new Date().getTime()));
cassandraSession.executeAsync(timerStop);
}
messagesReceived++;
//message.reply("OK");
});
}
@Override
public void stop() throws Exception {
super.stop();
cassandraSession.close();
}
}
当您为应用程序启用(任何类型的)集群时,您使应用程序更能应对故障,但同时也增加了性能损失。
例如,您当前的流程(没有聚类)类似于:
client ->
vert.x app ->
in memory same process eventbus (negletible) ->
handler -> cassandra
<- vert.x app
<- client
启用集群后:
client ->
vert.x app ->
serialize request ->
network request cluster member ->
deserialize request ->
handler -> cassandra
<- serialize response
<- network reply
<- deserialize response
<- vert.x app
<- client
如您所见,需要进行许多编码解码操作以及多次网络调用,所有这些都会添加到您的总请求时间中。
为了获得最佳性能,您需要利用局部性,离数据存储越近通常速度越快。
您实际上 运行 陷入了 Vert.x Hazelcast 集群管理器的极端情况。
首先,您正在使用一个 worker Verticle 来发送您的消息 (30000001)。 Hazelcast 在后台是阻塞的,因此当您从工作人员发送消息时,版本 3.3.3 不会考虑到这一点。最近我们添加了此修复程序 https://github.com/vert-x3/issues/issues/75(未出现在 3.4.0.Beta1 中,但出现在 3.4.0-SNAPSHOTS 中)将改善这种情况。
其次,当您同时发送所有消息时,它会遇到另一个极端情况,阻止 Hazelcast 集群管理器使用集群拓扑的缓存。此拓扑缓存通常在发送第一条消息后更新,并且一次发送所有消息会阻止使用缓存(简短说明 HazelcastAsyncMultiMap#getInProgressCount 将 > 0 并阻止使用缓存),因此要付出代价昂贵的查找(因此缓存)。
如果我将 Bertjan 的复制器与 3.4.0-SNAPSHOT + Hazelcast 一起使用并进行以下更改:将消息发送到目的地,等待回复。回复后发送所有消息然后我得到了很多改进。
没有聚类:5852 毫秒 使用 HZ 3.3.3 集群:16745 毫秒 使用 HZ 3.4.0-SNAPSHOT + 初始消息进行集群:8609 ms
我也认为您不应该使用 worker Verticle 发送那么多消息,而是使用事件循环 Verticle 批量发送它们。也许你应该解释你的用例,我们可以考虑解决它的最佳方法。