错误 com.websudos.phantom - 批量太大
ERROR com.websudos.phantom - Batch too large
我收到以下错误:
22:24:34.419 [run-main-0] DEBUG com.websudos.phantom - Executing query: com.datastax.driver.core.BatchStatement@3f4f5b68
22:24:34.426 [pool-15-thread-3] ERROR com.websudos.phantom - Batch too large
[error] (run-main-0) com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
重新运行代码,每次都在以下位置出现此错误:
cqlsh> select count(*) from superchain.blocks limit 1000000;
count
-------
51728
(1 rows)
Warnings :
Aggregation query used without partition key
提前感谢您的任何见解。
+++更新+++
所以违规代码是
//This file is Database.scala
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
def insertBlock(block: Block) = {
//should note here that have also tried Batch.unlogged to same effect
Batch.logged
.add(ChainDatabase.block.insertNewRecord(block))
.future()
}
def insertTransaction(tx: Transaction) = {
//should note here that have also tried Batch.unlogged to same effect
Batch.logged
.add(ChainDatabase.tx.insertNewTransaction(tx))
.future()
}
object block extends BlockTable with keyspace.Connector
object tx extends TransactionTable with keyspace.Connector
}
object ChainDatabase extends Database(Config.keySpaceDefinition)
下面是Transaction的插入函数,Block也有类似的代码
已尝试关注
&&
https://github.com/outworkers/phantom/wiki/Batch-statements
但我仍在努力寻找不会导致 Batch too large
错误的实现。
//This file is Transaction.scala
abstract class TransactionTable extends TransactionColumnFamily with RootConnector {
override val tableName = "transactions"
def insertNew(tx: Transaction): Future[ResultSet] = insertNewTransaction(tx).future()
def insertNewTransaction(tx: Transaction) = {
insert
.value(_.txid, tx.txid)
.value(_.version, tx.version)
.value(_.locktime, tx.locktime)
.value(_.vout, tx.vout)
.value(_.vin, tx.vin)
}
}
也许你误解了 Cassandra 中批处理的目的。
实际上它们是为了原子性而不是 运行 多个查询 "faster".
可以在这里找到很好的解释:
https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/
您遇到的错误不是因为 table 的大小,而是因为批处理中的查询数量。在任何给定批次中,您最多可以同时 运行 100 个查询。
与此同时,您几乎 99% 的人都使用了不太理想的方法,因为您真的不想在一个批次中进行如此多的查询。正如 Thiago 所建议的那样,批处理旨在保证原子性,而不是优化性能。
如果您想简单地进行并行查询,只需使用 Future.sequence
,它将使用 fork join pool 类的方法来并行化操作。
错误来自 Cassandra,而不是来自 phantom。无论您在客户端使用哪种方法,批量大小都是有上限的。
// Assuming you have a list of queries:
val execution = Future.sequence(queries map (_.future())
希望对您有所帮助!
更新
假设您有一个交易清单。
val list: List[Transaction] = ..
// all you need is
Future.sequence(list.map(tr => database.transactionTable.insertNew(tr))
这将产生一个未来,它将在所有基础期货完成时完成,有效地为您提供 return 类型:来自原始 List[Future[ResultSet]]
.
的 Future[List[ResultSet]]
正如其他人所说,您的第一条错误消息来自非常大的 BATCH 语句。 BATCH 语句不是为您在传统关系数据库中想到的批量插入而设计的。 BATCH 语句仅在跨多个非规范化表自动插入数据或使用 UNLOGGED BATCH 在同一分区键下插入数据时才有用。
Batch statements should NOT be used as an optimization technique,因为它们不是为提高速度而设计的,实际上会影响您的表现。
最后,这是一条错误消息,因为 Cassandra 客户端驱动程序正在尝试保护集群免受可能(并且将会)关闭集群中的节点的非常大的 BATCH 语句的影响。
其次,您指出 运行 a SELECT count(*) FROM table;
给您警告:
Aggregation query used without partition key
。
使用 count(*)
没有指定分区键是 antipattern。出于与上述类似的原因,它会对集群的稳定性产生负面影响。
最后,我怀疑您的 Cassandra DSL 库(不熟悉 Phantom-DSL)中的某处正在执行您不期望的 BATCH,或者您可能有意识地使用 BATCH 而没有完全理解它的适当用法。我知道在 spring-data 中,当您插入项目列表(这是一个可怕的反模式)时,他们会使用 BATCH,这可能会导致类似的错误。
我收到以下错误:
22:24:34.419 [run-main-0] DEBUG com.websudos.phantom - Executing query: com.datastax.driver.core.BatchStatement@3f4f5b68
22:24:34.426 [pool-15-thread-3] ERROR com.websudos.phantom - Batch too large
[error] (run-main-0) com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
重新运行代码,每次都在以下位置出现此错误:
cqlsh> select count(*) from superchain.blocks limit 1000000;
count
-------
51728
(1 rows)
Warnings :
Aggregation query used without partition key
提前感谢您的任何见解。
+++更新+++
所以违规代码是
//This file is Database.scala
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
def insertBlock(block: Block) = {
//should note here that have also tried Batch.unlogged to same effect
Batch.logged
.add(ChainDatabase.block.insertNewRecord(block))
.future()
}
def insertTransaction(tx: Transaction) = {
//should note here that have also tried Batch.unlogged to same effect
Batch.logged
.add(ChainDatabase.tx.insertNewTransaction(tx))
.future()
}
object block extends BlockTable with keyspace.Connector
object tx extends TransactionTable with keyspace.Connector
}
object ChainDatabase extends Database(Config.keySpaceDefinition)
下面是Transaction的插入函数,Block也有类似的代码
已尝试关注
&&
https://github.com/outworkers/phantom/wiki/Batch-statements
但我仍在努力寻找不会导致 Batch too large
错误的实现。
//This file is Transaction.scala
abstract class TransactionTable extends TransactionColumnFamily with RootConnector {
override val tableName = "transactions"
def insertNew(tx: Transaction): Future[ResultSet] = insertNewTransaction(tx).future()
def insertNewTransaction(tx: Transaction) = {
insert
.value(_.txid, tx.txid)
.value(_.version, tx.version)
.value(_.locktime, tx.locktime)
.value(_.vout, tx.vout)
.value(_.vin, tx.vin)
}
}
也许你误解了 Cassandra 中批处理的目的。
实际上它们是为了原子性而不是 运行 多个查询 "faster".
可以在这里找到很好的解释:
https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/
您遇到的错误不是因为 table 的大小,而是因为批处理中的查询数量。在任何给定批次中,您最多可以同时 运行 100 个查询。
与此同时,您几乎 99% 的人都使用了不太理想的方法,因为您真的不想在一个批次中进行如此多的查询。正如 Thiago 所建议的那样,批处理旨在保证原子性,而不是优化性能。
如果您想简单地进行并行查询,只需使用 Future.sequence
,它将使用 fork join pool 类的方法来并行化操作。
错误来自 Cassandra,而不是来自 phantom。无论您在客户端使用哪种方法,批量大小都是有上限的。
// Assuming you have a list of queries:
val execution = Future.sequence(queries map (_.future())
希望对您有所帮助!
更新
假设您有一个交易清单。
val list: List[Transaction] = ..
// all you need is
Future.sequence(list.map(tr => database.transactionTable.insertNew(tr))
这将产生一个未来,它将在所有基础期货完成时完成,有效地为您提供 return 类型:来自原始 List[Future[ResultSet]]
.
Future[List[ResultSet]]
正如其他人所说,您的第一条错误消息来自非常大的 BATCH 语句。 BATCH 语句不是为您在传统关系数据库中想到的批量插入而设计的。 BATCH 语句仅在跨多个非规范化表自动插入数据或使用 UNLOGGED BATCH 在同一分区键下插入数据时才有用。
Batch statements should NOT be used as an optimization technique,因为它们不是为提高速度而设计的,实际上会影响您的表现。
最后,这是一条错误消息,因为 Cassandra 客户端驱动程序正在尝试保护集群免受可能(并且将会)关闭集群中的节点的非常大的 BATCH 语句的影响。
其次,您指出 运行 a SELECT count(*) FROM table;
给您警告:
Aggregation query used without partition key
。
使用 count(*)
没有指定分区键是 antipattern。出于与上述类似的原因,它会对集群的稳定性产生负面影响。
最后,我怀疑您的 Cassandra DSL 库(不熟悉 Phantom-DSL)中的某处正在执行您不期望的 BATCH,或者您可能有意识地使用 BATCH 而没有完全理解它的适当用法。我知道在 spring-data 中,当您插入项目列表(这是一个可怕的反模式)时,他们会使用 BATCH,这可能会导致类似的错误。