与 MongoDB 相比,Java 驱动程序的 Cassandra 批量写入性能非常糟糕
Cassandra Bulk-Write performance with Java Driver is atrocious compared to MongoDB
我已经为 MongoDB 和 Cassandra 构建了一个导入器。基本上导入器的所有操作都是相同的,除了最后一部分,数据形成以匹配所需的 cassandra table 模式和想要的 mongodb 文档结构。与 MongoDB 相比,Cassandra 的写入性能非常糟糕,我认为我做错了什么。
基本上,我的抽象导入器 class 加载数据,读出所有数据并将其传递给扩展 MongoDBImporter 或 CassandraImporter class 以将数据发送到数据库。一次针对一个数据库 - 没有 "dual" 同时插入 C* 和 MongoDB。进口商是 运行 在同一台机器上针对相同数量的节点 (6)。
问题:
MongoDB 导入在 57 分钟后完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大约相同。我的 Cassandra 导入器现在 运行ning 自 2.5 小时以来只有 5.000.000 插入行。我会等待进口商完成并在这里编辑实际完成时间。
我如何使用 Cassandra 导入:
我在摄取数据之前准备了两个语句一次。这两个语句都是 UPDATE 查询 因为有时我必须将数据附加到现有列表。我的 table 在开始导入之前被完全清除。准备好的语句被一遍又一遍地使用。
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
对于每一行,我创建一个 BoundStatement 并将该语句传递给我的 "custom" 批处理方法:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
使用 MongoDB,我可以一次插入 1000 个文档(这是最大值)而不会出现问题。对于 Cassandra,导入器在某些时候仅针对我的 10 个语句崩溃 com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
。我正在使用此代码来构建批次。顺便说一句,我之前从 1000、500、300、200、100、50、20 批量大小开始,但显然它们也不起作用。然后我将它设置为 10,它再次抛出异常。现在我不知道为什么它会坏。
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
我的 C* 架构如下所示
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
有时我需要向现有行添加更多新事件。这就是为什么我需要一个 UDT 列表。我的 UDT 包含三个映射,因为事件创建者生成不同的数据(key/value 对类型 string/double/boolean)。我知道 UDT 已冻结,我无法触摸已摄取事件的地图。这对我来说很好,有时我只需要添加具有相同时间戳的新事件。我根据日志的创建者(一些传感器名称)以及记录日期(即“22-09-2016”)和时间戳的时间(为了更多地分发数据,同时将相关数据放在一起)进行分区一个分区)。
我在我的 pom.xml 中使用 Cassandra 3.0.8 和 Datastax Java 驱动程序,版本 3.1.0。
根据 ,我不应该通过在我的 cassandra.yaml
中调整 batch_size_fail_threshold_in_kb
来增加批量大小。那么...我的导入有什么问题或有什么问题?
更新
因此,我已将代码调整为 运行 异步查询并将当前 运行ning 插入存储在列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入发生错误时,该方法将等待 500 毫秒,直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。
但是在流式传输 3.300.000 行之后,有 280.000 个插入正在处理,但没有发生错误。这似乎是当前处理的插入数量看起来太高了。 6 个 cassandra 节点 运行 正在商用硬件上运行,该硬件已有 2 年历史。
大量并发插入(6 个节点为 280.000)是否有问题?我应该添加一个像 MAX_CONCURRENT_INSERT_LIMIT
这样的变量吗?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}
当您在 Cassandra 中 运行 批处理时,它会选择一个节点作为协调器。然后该节点负责确保批量写入找到它们合适的节点。因此(例如)通过将 10000 次写入批处理在一起,您现在已经为一个节点分配了协调 10000 次写入的任务,其中大部分将用于不同的节点。这样做很容易翻倒一个节点,或者消除整个集群的延迟。因此,限制批量大小的原因。
问题是 Cassandra CQL BATCH 用词不当,它并不像您或其他任何人认为的那样。它不用于提高性能。并行、异步写入总是比 运行 将相同数量的语句 BATCH 在一起更快。
I know that I could easily batch 10.000 rows together because they will go to the same partition. ... Would you still use single row inserts (async) rather than batches?
这取决于写入性能是否是您的真正目标。如果是这样,那么我仍然会坚持使用并行、异步写入。
有关这方面的更多有用信息,请查看 DataStax 的 Ryan Svihla 的这两篇博文:
Cassandra: Batch loading without the Batch keyword
Cassandra: Batch Loading Without the Batch — The Nuanced Edition
在使用 C* 一段时间后,我相信您真的应该只使用批处理来保持多个表的同步。如果您不需要 功能 ,那么根本不要使用批处理,因为您 会 导致性能损失。
将数据加载到 C* 的正确方法是使用异步写入,如果您的集群跟不上摄取速率,则可以使用可选的背压。您应该将 "custom" 批处理方法替换为:
- 执行异步写入
- 控制您拥有的飞行写入数量
- 写入超时时执行一些重试。
要执行异步写入,请使用 .executeAsync
方法,这将 return 你一个 ResultSetFuture
对象。
为了控制有多少飞行中的查询只收集从列表中的 .executeAsync
方法检索到的 ResultSetFuture
对象,如果列表得到(这里的大概值)说 1k 元素然后等待让他们在发出更多写入之前完成。或者您可以等待第一个完成后再发出一个写入,只是为了保持列表完整。
最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:
- 用相同的超时值再次写入
- 使用增加的超时值再次写入
- 等待一段时间,然后使用相同的超时值再次写入
- 等待一段时间,然后使用增加的超时值再次写入
从 1 到 4,您的背压增加 强度。选择最适合您的情况。
问题更新后编辑
我觉得你的插入逻辑有点问题:
- 我没有看到任何重试逻辑
- 如果失败,您不会删除列表中的项目
- 您的
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)
是错误的,因为只有当发出的查询数量 > concurrentInsertLimit
时您才会休眠,并且由于 2. 您的线程将停在那里。
- 你从未设置为 false
concurrentInsertErrorOccured
我通常保留一个(失败的)查询列表,以便稍后重试。这给了我对查询的强大控制,当失败的查询开始累积时,我睡了一会儿,然后继续重试它们(最多 X 次,然后硬失败......)。
这个列表应该是非常动态的,例如,当查询失败时,您可以在其中添加项目,并在执行重试时删除项目。现在您可以了解您的集群的限制,并根据例如最后一秒失败查询的平均数量调整您的 concurrentInsertLimit
,或者坚持更简单的方法“如果我们有一个项目则暂停在重试列表中" 等等...
评论后编辑 2
由于您不需要任何重试逻辑,我会这样更改您的代码:
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
runningInsertList.remove(future);
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
//Sleep while the currently processing number of inserts is too high
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
if (!concurrentInsertErrorOccured) {
// Increase your ingestion rate if no query failed so far
concurrentInsertLimit += 10;
} else {
// Decrease your ingestion rate because at least one query failed
concurrentInsertErrorOccured = false;
concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
}
return;
}
您还可以通过用计数器替换您的 List<ResultSetFuture>
来稍微优化程序。
希望对您有所帮助。
我已经为 MongoDB 和 Cassandra 构建了一个导入器。基本上导入器的所有操作都是相同的,除了最后一部分,数据形成以匹配所需的 cassandra table 模式和想要的 mongodb 文档结构。与 MongoDB 相比,Cassandra 的写入性能非常糟糕,我认为我做错了什么。
基本上,我的抽象导入器 class 加载数据,读出所有数据并将其传递给扩展 MongoDBImporter 或 CassandraImporter class 以将数据发送到数据库。一次针对一个数据库 - 没有 "dual" 同时插入 C* 和 MongoDB。进口商是 运行 在同一台机器上针对相同数量的节点 (6)。
问题:
MongoDB 导入在 57 分钟后完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大约相同。我的 Cassandra 导入器现在 运行ning 自 2.5 小时以来只有 5.000.000 插入行。我会等待进口商完成并在这里编辑实际完成时间。
我如何使用 Cassandra 导入:
我在摄取数据之前准备了两个语句一次。这两个语句都是 UPDATE 查询 因为有时我必须将数据附加到现有列表。我的 table 在开始导入之前被完全清除。准备好的语句被一遍又一遍地使用。
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
对于每一行,我创建一个 BoundStatement 并将该语句传递给我的 "custom" 批处理方法:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
使用 MongoDB,我可以一次插入 1000 个文档(这是最大值)而不会出现问题。对于 Cassandra,导入器在某些时候仅针对我的 10 个语句崩溃 com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
。我正在使用此代码来构建批次。顺便说一句,我之前从 1000、500、300、200、100、50、20 批量大小开始,但显然它们也不起作用。然后我将它设置为 10,它再次抛出异常。现在我不知道为什么它会坏。
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
我的 C* 架构如下所示
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
有时我需要向现有行添加更多新事件。这就是为什么我需要一个 UDT 列表。我的 UDT 包含三个映射,因为事件创建者生成不同的数据(key/value 对类型 string/double/boolean)。我知道 UDT 已冻结,我无法触摸已摄取事件的地图。这对我来说很好,有时我只需要添加具有相同时间戳的新事件。我根据日志的创建者(一些传感器名称)以及记录日期(即“22-09-2016”)和时间戳的时间(为了更多地分发数据,同时将相关数据放在一起)进行分区一个分区)。
我在我的 pom.xml 中使用 Cassandra 3.0.8 和 Datastax Java 驱动程序,版本 3.1.0。
根据 cassandra.yaml
中调整 batch_size_fail_threshold_in_kb
来增加批量大小。那么...我的导入有什么问题或有什么问题?
更新 因此,我已将代码调整为 运行 异步查询并将当前 运行ning 插入存储在列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入发生错误时,该方法将等待 500 毫秒,直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。
但是在流式传输 3.300.000 行之后,有 280.000 个插入正在处理,但没有发生错误。这似乎是当前处理的插入数量看起来太高了。 6 个 cassandra 节点 运行 正在商用硬件上运行,该硬件已有 2 年历史。
大量并发插入(6 个节点为 280.000)是否有问题?我应该添加一个像 MAX_CONCURRENT_INSERT_LIMIT
这样的变量吗?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}
当您在 Cassandra 中 运行 批处理时,它会选择一个节点作为协调器。然后该节点负责确保批量写入找到它们合适的节点。因此(例如)通过将 10000 次写入批处理在一起,您现在已经为一个节点分配了协调 10000 次写入的任务,其中大部分将用于不同的节点。这样做很容易翻倒一个节点,或者消除整个集群的延迟。因此,限制批量大小的原因。
问题是 Cassandra CQL BATCH 用词不当,它并不像您或其他任何人认为的那样。它不用于提高性能。并行、异步写入总是比 运行 将相同数量的语句 BATCH 在一起更快。
I know that I could easily batch 10.000 rows together because they will go to the same partition. ... Would you still use single row inserts (async) rather than batches?
这取决于写入性能是否是您的真正目标。如果是这样,那么我仍然会坚持使用并行、异步写入。
有关这方面的更多有用信息,请查看 DataStax 的 Ryan Svihla 的这两篇博文:
Cassandra: Batch loading without the Batch keyword
Cassandra: Batch Loading Without the Batch — The Nuanced Edition
在使用 C* 一段时间后,我相信您真的应该只使用批处理来保持多个表的同步。如果您不需要 功能 ,那么根本不要使用批处理,因为您 会 导致性能损失。
将数据加载到 C* 的正确方法是使用异步写入,如果您的集群跟不上摄取速率,则可以使用可选的背压。您应该将 "custom" 批处理方法替换为:
- 执行异步写入
- 控制您拥有的飞行写入数量
- 写入超时时执行一些重试。
要执行异步写入,请使用 .executeAsync
方法,这将 return 你一个 ResultSetFuture
对象。
为了控制有多少飞行中的查询只收集从列表中的 .executeAsync
方法检索到的 ResultSetFuture
对象,如果列表得到(这里的大概值)说 1k 元素然后等待让他们在发出更多写入之前完成。或者您可以等待第一个完成后再发出一个写入,只是为了保持列表完整。
最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:
- 用相同的超时值再次写入
- 使用增加的超时值再次写入
- 等待一段时间,然后使用相同的超时值再次写入
- 等待一段时间,然后使用增加的超时值再次写入
从 1 到 4,您的背压增加 强度。选择最适合您的情况。
问题更新后编辑
我觉得你的插入逻辑有点问题:
- 我没有看到任何重试逻辑
- 如果失败,您不会删除列表中的项目
- 您的
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)
是错误的,因为只有当发出的查询数量 >concurrentInsertLimit
时您才会休眠,并且由于 2. 您的线程将停在那里。 - 你从未设置为 false
concurrentInsertErrorOccured
我通常保留一个(失败的)查询列表,以便稍后重试。这给了我对查询的强大控制,当失败的查询开始累积时,我睡了一会儿,然后继续重试它们(最多 X 次,然后硬失败......)。
这个列表应该是非常动态的,例如,当查询失败时,您可以在其中添加项目,并在执行重试时删除项目。现在您可以了解您的集群的限制,并根据例如最后一秒失败查询的平均数量调整您的 concurrentInsertLimit
,或者坚持更简单的方法“如果我们有一个项目则暂停在重试列表中" 等等...
评论后编辑 2
由于您不需要任何重试逻辑,我会这样更改您的代码:
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
runningInsertList.remove(future);
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
//Sleep while the currently processing number of inserts is too high
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
if (!concurrentInsertErrorOccured) {
// Increase your ingestion rate if no query failed so far
concurrentInsertLimit += 10;
} else {
// Decrease your ingestion rate because at least one query failed
concurrentInsertErrorOccured = false;
concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
while (runningInsertList.size() >= concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
}
return;
}
您还可以通过用计数器替换您的 List<ResultSetFuture>
来稍微优化程序。
希望对您有所帮助。