为什么我的 Cassandra Prepared Statement Ingest of Data 这么慢?
Why is my Cassandra Prepared Statement Ingest of Data so slow?
我有一个包含 100,000 个名称的 Java 列表,我想将这些名称摄取到 3 节点 Cassandra 集群中,即 运行 Datastax Enterprise 5.1 with Cassandra 3.10.0
我的代码可以摄取,但需要很长时间。我 运行 对集群进行了压力测试,每秒能够进行超过 25,000 次写入。使用我的摄取代码,我得到了大约 200/秒的糟糕性能。
我的 Java 列表中有 100,000 个名字,名为 myList。我使用以下准备好的语句和会话执行来摄取数据。
PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");
int id = 0;
for(int i = 0; i < myList.size(); i++) {
id += 1;
session.execute(prepared.bind(id, myList.get(i)));
}
我在我的代码中添加了一个集群监视器来查看发生了什么。这是我的监控代码。
/// Monitoring Status of Cluster
final LoadBalancingPolicy loadBalancingPolicy =
cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
ScheduledExecutorService scheduled =
Executors.newScheduledThreadPool(1);
scheduled.scheduleAtFixedRate(() -> {
Session.State state = session.getState();
state.getConnectedHosts().forEach((host) -> {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
System.out.printf("%s connections=%d, current load=%d, maxload=%d%n",
host, connections, inFlightQueries,
connections *
poolingOptions.getMaxRequestsPerConnection(distance));
});
}, 5, 5, TimeUnit.SECONDS);
监控 5 秒输出显示以下 3 次迭代:
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=0, maxload=32768
/192.168.20.26:9042 connections=1, current load=1, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
我似乎没有非常有效地利用我的集群。我不确定我做错了什么,非常感谢任何提示。
谢谢!
使用 executeAsync。
Executes the provided query asynchronously. This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the ResultSetFuture.
您正在插入大量数据。如果您使用 executeAsync 并且您的集群无法处理如此大量的数据,它可能会抛出异常。您可以使用 Semaphore 限制 executeAsync。
示例:
PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");
int numberOfConcurrentQueries = 100;
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);
int id = 0;
for(int i = 0; i < myList.size(); i++) {
try {
id += 1;
semaphore.acquire();
ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i)));
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
semaphore.release();
}
});
} catch (Exception e) {
semaphore.release();
e.printStackTrace();
}
}
我有一个包含 100,000 个名称的 Java 列表,我想将这些名称摄取到 3 节点 Cassandra 集群中,即 运行 Datastax Enterprise 5.1 with Cassandra 3.10.0
我的代码可以摄取,但需要很长时间。我 运行 对集群进行了压力测试,每秒能够进行超过 25,000 次写入。使用我的摄取代码,我得到了大约 200/秒的糟糕性能。
我的 Java 列表中有 100,000 个名字,名为 myList。我使用以下准备好的语句和会话执行来摄取数据。
PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");
int id = 0;
for(int i = 0; i < myList.size(); i++) {
id += 1;
session.execute(prepared.bind(id, myList.get(i)));
}
我在我的代码中添加了一个集群监视器来查看发生了什么。这是我的监控代码。
/// Monitoring Status of Cluster
final LoadBalancingPolicy loadBalancingPolicy =
cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
ScheduledExecutorService scheduled =
Executors.newScheduledThreadPool(1);
scheduled.scheduleAtFixedRate(() -> {
Session.State state = session.getState();
state.getConnectedHosts().forEach((host) -> {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
System.out.printf("%s connections=%d, current load=%d, maxload=%d%n",
host, connections, inFlightQueries,
connections *
poolingOptions.getMaxRequestsPerConnection(distance));
});
}, 5, 5, TimeUnit.SECONDS);
监控 5 秒输出显示以下 3 次迭代:
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=0, maxload=32768
/192.168.20.26:9042 connections=1, current load=1, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
我似乎没有非常有效地利用我的集群。我不确定我做错了什么,非常感谢任何提示。
谢谢!
使用 executeAsync。
Executes the provided query asynchronously. This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the ResultSetFuture.
您正在插入大量数据。如果您使用 executeAsync 并且您的集群无法处理如此大量的数据,它可能会抛出异常。您可以使用 Semaphore 限制 executeAsync。
示例:
PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");
int numberOfConcurrentQueries = 100;
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);
int id = 0;
for(int i = 0; i < myList.size(); i++) {
try {
id += 1;
semaphore.acquire();
ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i)));
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
semaphore.release();
}
@Override
public void onFailure(Throwable t) {
semaphore.release();
}
});
} catch (Exception e) {
semaphore.release();
e.printStackTrace();
}
}