使用 "executeAsync" 时如何限制对 cassandra 的写入请求?
How to throttle writes request to cassandra when working with "executeAsync"?
我正在使用 datastax java 驱动程序 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在以 QUORUM 一致性异步编写。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的保存方法将以非常快的速度从多个线程调用。
问题:
我想限制对异步写入 Cassandra 的 executeAsync
方法的请求。如果我以比我的 Cassandra 集群可以处理的速度非常高的速度写入,那么它将开始抛出错误,我希望我的所有写入都应该成功地进入 cassandra 而没有任何损失。
我看到这个 解决方案是使用 Semaphore
和固定数量的许可。但我不确定如何以及什么是最好的实现方式。我以前从未使用过信号量。这是逻辑。谁能提供一个基于我的代码的信号量示例,或者如果有更好的way/option,那么也让我知道。
In the context of writing a dataloader program, you could do something
like the following:
- To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight
requests). Whenever you go to submit a query using executeAsync,
acquire a permit. You should really only need 1 thread (but may want
to introduce a pool of # cpu cores size that does this) that acquires
the permits from the Semaphore and executes queries. It will just
block on acquire until there is an available permit.
- Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and
onFailure cases. By releasing a permit, this should allow your thread
in step 1 to continue and submit the next request.
我还看到其他几个 post 谈到使用 RingBuffer
或 Guava RateLimitter
那么我应该使用哪个更好?以下是我能想到的选项:
- 使用信号量
- 使用环形缓冲区
- 使用 Guava 速率限制器
任何人都可以帮我举例说明我们如何限制请求或为 cassandra 写入获取背压并确保所有写入成功进入 cassandra 吗?
不是权威答案,但也许会有所帮助。首先,您应该考虑当无法立即执行查询时您会怎么做。无论您选择哪种速率限制,如果您收到的请求速率高于您可以写入 Cassandra 的速率,最终您的进程都会被等待请求阻塞。在那一刻,您需要告诉您的客户暂时保留他们的请求 ("push back")。例如。如果他们是通过 HTTP 来的,那么响应状态将是 429 "Too Many Requests"。如果您在同一进程中生成请求,则决定可接受的最长超时时间。也就是说,如果 Cassandra 跟不上,那么是时候扩展(或调整)它了。
也许在实施速率限制之前,值得在调用 save
方法(使用 Thread.sleep(...))之前试验并在线程中添加人为延迟,看看它是否能解决您的问题或者需要其他东西。
查询返回错误是 来自 Cassandra 的背压。但是您可以选择或实施 RetryPolicy 来确定何时重试失败的查询。
你也可以看看connection pool options (and especially Monitoring and tuning the pool). One can tune number of asynchronous requests per connection。然而文档说对于 Cassandra 2.x 这个参数上限为 128 并且不应该改变它(不过我会尝试它:)
信号量的实现看起来像
/* Share it among all threads or associate with a thread for per-thread limits
Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20);
public void save(String process, int clientid, long deviceid) {
....
queryPermits.acquire(); // Blocks until a permit is available
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
queryPermits.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
queryPermits.release(); // Permit should be released in all cases.
logger.logError("error= ", t);
}
}, executorService);
....
}
(在实际代码中,我会创建一个包装器回调来释放许可,然后调用包装的方法)
Guava 的 RateLimiter 类似于信号量,但允许在未充分利用期后临时爆发并根据时间限制请求(而不是活动查询的总数)。
然而,无论如何请求都会因各种原因而失败,所以最好制定一个如何重试它们的计划(以防出现间歇性错误)。
这可能不适合你的情况,但我会尝试使用一些队列或缓冲区来排队请求(例如 java.util.concurrent.ArrayBlockingQueue
)。 "Buffer full" 表示客户端应该等待或放弃请求。缓冲区也将用于重新排队失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试。当队列已满并且同时有新的失败请求时,还应该以某种方式处理这种情况。然后,单线程工作人员将从队列中挑选请求并将它们发送到 Cassandra。因为它不应该做太多,所以它不太可能成为瓶颈。该工作人员还可以应用自己的速率限制,例如基于 com.google.common.util.concurrent.RateLimiter
.
的时间
如果想尽可能避免丢失消息,他可以在 Cassandra 前面放置一个具有持久性的消息代理(例如 Kafka)。这样传入的消息甚至可以在 Cassandra 长时间中断后继续存在。但是,我想,这对你来说太过分了。
简单地使用阻塞队列就可以了。
Futures 是线程化的,那里的回调(成功和失败)将充当消费者,无论您从哪里调用 save 方法,都将充当生产者。
更好的方法是,您将完整的请求本身放入队列中,并在每次出队时将其逐一触发保存。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
queue.take();
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
queue.take();
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
public void invokeSaveInLoop(){
Object dummyObj = new Object();
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
for(int i=0; i< 1000; i++){
save("process", clientid, deviceid, queue);
queue.put(dummyObj);
}
}
如果您想进一步检查集群中途的负载
public static String getCurrentState(){
StringBuilder response = new StringBuilder();
response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
final LoadBalancingPolicy loadBalancingPolicy =
cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
final PoolingOptions poolingOptions =
cluster.getConfiguration().getPoolingOptions();
Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
connections *
poolingOptions.getMaxRequestsPerConnection(distance)))
.append("<br>\n");
}
return response.toString();
}
我正在使用 datastax java 驱动程序 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在以 QUORUM 一致性异步编写。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的保存方法将以非常快的速度从多个线程调用。
问题:
我想限制对异步写入 Cassandra 的 executeAsync
方法的请求。如果我以比我的 Cassandra 集群可以处理的速度非常高的速度写入,那么它将开始抛出错误,我希望我的所有写入都应该成功地进入 cassandra 而没有任何损失。
我看到这个 Semaphore
和固定数量的许可。但我不确定如何以及什么是最好的实现方式。我以前从未使用过信号量。这是逻辑。谁能提供一个基于我的代码的信号量示例,或者如果有更好的way/option,那么也让我知道。
In the context of writing a dataloader program, you could do something like the following:
- To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
- Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.
我还看到其他几个 post 谈到使用 RingBuffer
或 Guava RateLimitter
那么我应该使用哪个更好?以下是我能想到的选项:
- 使用信号量
- 使用环形缓冲区
- 使用 Guava 速率限制器
任何人都可以帮我举例说明我们如何限制请求或为 cassandra 写入获取背压并确保所有写入成功进入 cassandra 吗?
不是权威答案,但也许会有所帮助。首先,您应该考虑当无法立即执行查询时您会怎么做。无论您选择哪种速率限制,如果您收到的请求速率高于您可以写入 Cassandra 的速率,最终您的进程都会被等待请求阻塞。在那一刻,您需要告诉您的客户暂时保留他们的请求 ("push back")。例如。如果他们是通过 HTTP 来的,那么响应状态将是 429 "Too Many Requests"。如果您在同一进程中生成请求,则决定可接受的最长超时时间。也就是说,如果 Cassandra 跟不上,那么是时候扩展(或调整)它了。
也许在实施速率限制之前,值得在调用 save
方法(使用 Thread.sleep(...))之前试验并在线程中添加人为延迟,看看它是否能解决您的问题或者需要其他东西。
查询返回错误是 来自 Cassandra 的背压。但是您可以选择或实施 RetryPolicy 来确定何时重试失败的查询。
你也可以看看connection pool options (and especially Monitoring and tuning the pool). One can tune number of asynchronous requests per connection。然而文档说对于 Cassandra 2.x 这个参数上限为 128 并且不应该改变它(不过我会尝试它:)
信号量的实现看起来像
/* Share it among all threads or associate with a thread for per-thread limits
Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20);
public void save(String process, int clientid, long deviceid) {
....
queryPermits.acquire(); // Blocks until a permit is available
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
queryPermits.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
queryPermits.release(); // Permit should be released in all cases.
logger.logError("error= ", t);
}
}, executorService);
....
}
(在实际代码中,我会创建一个包装器回调来释放许可,然后调用包装的方法)
Guava 的 RateLimiter 类似于信号量,但允许在未充分利用期后临时爆发并根据时间限制请求(而不是活动查询的总数)。
然而,无论如何请求都会因各种原因而失败,所以最好制定一个如何重试它们的计划(以防出现间歇性错误)。
这可能不适合你的情况,但我会尝试使用一些队列或缓冲区来排队请求(例如 java.util.concurrent.ArrayBlockingQueue
)。 "Buffer full" 表示客户端应该等待或放弃请求。缓冲区也将用于重新排队失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试。当队列已满并且同时有新的失败请求时,还应该以某种方式处理这种情况。然后,单线程工作人员将从队列中挑选请求并将它们发送到 Cassandra。因为它不应该做太多,所以它不太可能成为瓶颈。该工作人员还可以应用自己的速率限制,例如基于 com.google.common.util.concurrent.RateLimiter
.
如果想尽可能避免丢失消息,他可以在 Cassandra 前面放置一个具有持久性的消息代理(例如 Kafka)。这样传入的消息甚至可以在 Cassandra 长时间中断后继续存在。但是,我想,这对你来说太过分了。
简单地使用阻塞队列就可以了。 Futures 是线程化的,那里的回调(成功和失败)将充当消费者,无论您从哪里调用 save 方法,都将充当生产者。
更好的方法是,您将完整的请求本身放入队列中,并在每次出队时将其逐一触发保存。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
queue.take();
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
queue.take();
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
public void invokeSaveInLoop(){
Object dummyObj = new Object();
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
for(int i=0; i< 1000; i++){
save("process", clientid, deviceid, queue);
queue.put(dummyObj);
}
}
如果您想进一步检查集群中途的负载
public static String getCurrentState(){
StringBuilder response = new StringBuilder();
response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
final LoadBalancingPolicy loadBalancingPolicy =
cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
final PoolingOptions poolingOptions =
cluster.getConfiguration().getPoolingOptions();
Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
connections *
poolingOptions.getMaxRequestsPerConnection(distance)))
.append("<br>\n");
}
return response.toString();
}