Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句
How Cassandra handle blocking execute statement in datastax java driver
正在阻止 com.datastax.driver.core.Session
的执行方法
public ResultSet execute(Statement statement);
对该方法的评论:
This method blocks until at least some result has been received from
the database. However, for SELECT queries, it does not guarantee that
the result has been received in full. But it does guarantee that some
response has been received from the database, and in particular
guarantee that if the request is invalid, an exception will be thrown
by this method.
来自com.datastax.driver.core.Session
的非阻塞执行方法
public ResultSetFuture executeAsync(Statement statement);
This method does not block. It returns as soon as the query has been
passed to the underlying network stack. In particular, returning from
this method does not guarantee that the query is valid or has even
been submitted to a live node. Any exception pertaining to the failure
of the query will be thrown when accessing the {@link
ResultSetFuture}.
我有02个关于它们的问题,如果你能帮助我理解它们就太好了。
假设我有 100 万条记录,我希望所有记录都到达数据库(没有任何丢失)。
问题 1:如果我有 n 个线程,所有线程将有相同数量的记录,它们需要发送到数据库。他们都使用阻塞执行调用继续向 cassandra 发送多个插入查询。如果我增加 n 的值,它是否也有助于加快我需要将所有记录插入到 cassandra 的时间?
这会导致 cassandra 出现性能问题吗? Cassandra 是否必须确保对于每条插入记录,集群中的所有节点都应该立即知道新记录?为了保持数据的一致性。 (我假设 cassandra 节点甚至不会考虑使用本地机器时间来控制记录插入时间)。
问题2:使用非阻塞执行,如何确保所有的插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行情况。有什么更好的办法吗?非阻塞执行是否比阻塞执行更容易失败?
非常感谢您的帮助。
If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?
在某种程度上。让我们稍微脱离客户端实现细节,从 "Number of concurrent requests" 的角度来看问题,因为如果使用 executeAsync,则不需要为每个正在进行的请求都创建一个线程。在我的测试中,我发现虽然拥有大量并发请求有很多价值,但存在一个阈值,returns 或性能开始下降。我的一般经验法则是 (number of Nodes *
native_transport_max_threads (default: 128)
* 2)
,但您可能会发现更多或更少的最佳结果。
这里的想法是,将比 cassandra 一次处理的请求更多的请求排队没有多大价值。在减少飞行请求数量的同时,您可以限制驱动程序客户端和 cassandra 之间不必要的连接拥塞。
Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?
通过 get
在 ResultSetFuture 上等待是一种方法,但如果您正在开发完全异步的应用程序,则需要尽可能避免阻塞。使用番石榴,你最好的两把武器是 Futures.addCallback
and Futures.transform
.
Futures.addCallback
允许您注册一个 FutureCallback
,当驱动程序收到响应时执行。 onSuccess
在成功的情况下被执行,onFailure
否则。
Futures.transform
允许您有效地将返回的 ResultSetFuture
映射到其他内容。例如,如果您只想要 1 列的值,则可以使用它将 ListenableFuture<ResultSet>
转换为 ListenableFuture<String>
,而不必在 ResultSetFuture
上阻塞代码,然后获取字符串值。
在编写数据加载程序的上下文中,您可以执行如下操作:
- 为简单起见,请使用
Semaphore
或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用 executeAsync
提交查询时,请获得许可。您实际上应该只需要 1 个线程(但可能想引入一个 # cpu 核心大小的池来执行此操作),它从信号量获取许可并执行查询。它只会在获得可用许可之前阻塞。
- 对从
executeAsync
返回的未来使用 Futures.addCallback
。在 onSuccess
和 onFailure
两种情况下,回调都应调用 Sempahore.release()
。通过释放许可,这应该允许您在步骤 1 中的线程继续并提交下一个请求。
要进一步提高吞吐量,您可能需要考虑使用 BatchStatement
并批量提交请求。如果您保持较小的批次(50-250 是一个不错的数字)并且批次中的插入都共享相同的分区键,那么这是一个不错的选择。
除了上面的回答,
看起来 execute() 调用 executeAsync(statement).getUninterruptibly(),因此您是否使用 execute() 管理自己的 "n thread pool" 并阻止自己直到执行完成最多 n 运行宁线程或在所有记录上使用 executeAsync(),cassandra 端性能应该大致相同,具体取决于执行 time/count + 超时。
他们的执行将从池中借用所有 运行 个连接,每个执行在客户端都有一个 streamId,并在响应返回此 streamId 时通过未来通知您,受每个连接的总请求数限制客户端和总请求数受限于被选中执行请求的每个节点上的读取线程,任何更高的数字都将缓冲在受连接 maxQueueSize 和 maxRequestsPerConnection 限制的队列中(未阻塞),任何高于此值的数字都将失败。这样做的好处是 executeAsync() 不会在每个 request/execution.
的新线程上 运行
因此,必须限制通过 execute() 或 executeAsync() 可以请求的数量 运行,在 execute() 中您要避免超出这些限制。
在性能方面,您将开始看到超出每个节点可以处理的惩罚,因此具有良好大小池的 execute() 对我来说很有意义。更好的是,使用反应式架构来避免创建太多除了等待什么都不做的线程,如此多的线程会导致客户端浪费上下文切换。对于较少数量的请求,通过避免线程池,executeAsync() 会更好。
DefaultResultSetFuture future = new DefaultResultSetFuture(..., makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();
正在阻止 com.datastax.driver.core.Session
的执行方法public ResultSet execute(Statement statement);
对该方法的评论:
This method blocks until at least some result has been received from the database. However, for SELECT queries, it does not guarantee that the result has been received in full. But it does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method.
来自com.datastax.driver.core.Session
的非阻塞执行方法public ResultSetFuture executeAsync(Statement statement);
This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the {@link ResultSetFuture}.
我有02个关于它们的问题,如果你能帮助我理解它们就太好了。
假设我有 100 万条记录,我希望所有记录都到达数据库(没有任何丢失)。
问题 1:如果我有 n 个线程,所有线程将有相同数量的记录,它们需要发送到数据库。他们都使用阻塞执行调用继续向 cassandra 发送多个插入查询。如果我增加 n 的值,它是否也有助于加快我需要将所有记录插入到 cassandra 的时间?
这会导致 cassandra 出现性能问题吗? Cassandra 是否必须确保对于每条插入记录,集群中的所有节点都应该立即知道新记录?为了保持数据的一致性。 (我假设 cassandra 节点甚至不会考虑使用本地机器时间来控制记录插入时间)。
问题2:使用非阻塞执行,如何确保所有的插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行情况。有什么更好的办法吗?非阻塞执行是否比阻塞执行更容易失败?
非常感谢您的帮助。
If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?
在某种程度上。让我们稍微脱离客户端实现细节,从 "Number of concurrent requests" 的角度来看问题,因为如果使用 executeAsync,则不需要为每个正在进行的请求都创建一个线程。在我的测试中,我发现虽然拥有大量并发请求有很多价值,但存在一个阈值,returns 或性能开始下降。我的一般经验法则是 (number of Nodes *
native_transport_max_threads (default: 128)
* 2)
,但您可能会发现更多或更少的最佳结果。
这里的想法是,将比 cassandra 一次处理的请求更多的请求排队没有多大价值。在减少飞行请求数量的同时,您可以限制驱动程序客户端和 cassandra 之间不必要的连接拥塞。
Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?
通过 get
在 ResultSetFuture 上等待是一种方法,但如果您正在开发完全异步的应用程序,则需要尽可能避免阻塞。使用番石榴,你最好的两把武器是 Futures.addCallback
and Futures.transform
.
Futures.addCallback
允许您注册一个FutureCallback
,当驱动程序收到响应时执行。onSuccess
在成功的情况下被执行,onFailure
否则。Futures.transform
允许您有效地将返回的ResultSetFuture
映射到其他内容。例如,如果您只想要 1 列的值,则可以使用它将ListenableFuture<ResultSet>
转换为ListenableFuture<String>
,而不必在ResultSetFuture
上阻塞代码,然后获取字符串值。
在编写数据加载程序的上下文中,您可以执行如下操作:
- 为简单起见,请使用
Semaphore
或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用executeAsync
提交查询时,请获得许可。您实际上应该只需要 1 个线程(但可能想引入一个 # cpu 核心大小的池来执行此操作),它从信号量获取许可并执行查询。它只会在获得可用许可之前阻塞。 - 对从
executeAsync
返回的未来使用Futures.addCallback
。在onSuccess
和onFailure
两种情况下,回调都应调用Sempahore.release()
。通过释放许可,这应该允许您在步骤 1 中的线程继续并提交下一个请求。
要进一步提高吞吐量,您可能需要考虑使用 BatchStatement
并批量提交请求。如果您保持较小的批次(50-250 是一个不错的数字)并且批次中的插入都共享相同的分区键,那么这是一个不错的选择。
除了上面的回答,
看起来 execute() 调用 executeAsync(statement).getUninterruptibly(),因此您是否使用 execute() 管理自己的 "n thread pool" 并阻止自己直到执行完成最多 n 运行宁线程或在所有记录上使用 executeAsync(),cassandra 端性能应该大致相同,具体取决于执行 time/count + 超时。
他们的执行将从池中借用所有 运行 个连接,每个执行在客户端都有一个 streamId,并在响应返回此 streamId 时通过未来通知您,受每个连接的总请求数限制客户端和总请求数受限于被选中执行请求的每个节点上的读取线程,任何更高的数字都将缓冲在受连接 maxQueueSize 和 maxRequestsPerConnection 限制的队列中(未阻塞),任何高于此值的数字都将失败。这样做的好处是 executeAsync() 不会在每个 request/execution.
的新线程上 运行因此,必须限制通过 execute() 或 executeAsync() 可以请求的数量 运行,在 execute() 中您要避免超出这些限制。
在性能方面,您将开始看到超出每个节点可以处理的惩罚,因此具有良好大小池的 execute() 对我来说很有意义。更好的是,使用反应式架构来避免创建太多除了等待什么都不做的线程,如此多的线程会导致客户端浪费上下文切换。对于较少数量的请求,通过避免线程池,executeAsync() 会更好。
DefaultResultSetFuture future = new DefaultResultSetFuture(..., makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();