错误 com.websudos.phantom - 批量太大

ERROR com.websudos.phantom - Batch too large

我收到以下错误:

22:24:34.419 [run-main-0] DEBUG com.websudos.phantom - Executing query: com.datastax.driver.core.BatchStatement@3f4f5b68
22:24:34.426 [pool-15-thread-3] ERROR com.websudos.phantom - Batch too large
[error] (run-main-0) com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large

重新运行代码,每次都在以下位置出现此错误:

cqlsh> select count(*) from superchain.blocks  limit 1000000;

 count
-------
 51728

(1 rows)

Warnings :
Aggregation query used without partition key

提前感谢您的任何见解。

+++更新+++

所以违规代码是

//This file is Database.scala
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
  def insertBlock(block: Block) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.block.insertNewRecord(block))
      .future()
  }

  def insertTransaction(tx: Transaction) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.tx.insertNewTransaction(tx))
      .future()
  }

  object block extends BlockTable with keyspace.Connector

  object tx extends TransactionTable with keyspace.Connector


}

object ChainDatabase extends Database(Config.keySpaceDefinition)

下面是Transaction的插入函数,Block也有类似的代码

已尝试关注

https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e#.7zdd0qopv

&&

https://github.com/outworkers/phantom/wiki/Batch-statements

但我仍在努力寻找不会导致 Batch too large 错误的实现。

//This file is Transaction.scala
abstract class TransactionTable extends TransactionColumnFamily with RootConnector {

  override val tableName = "transactions"

  def insertNew(tx: Transaction): Future[ResultSet] = insertNewTransaction(tx).future()

  def insertNewTransaction(tx: Transaction) = {
    insert
      .value(_.txid, tx.txid)
      .value(_.version, tx.version)
      .value(_.locktime, tx.locktime)
      .value(_.vout, tx.vout)
      .value(_.vin, tx.vin)
  }

}

也许你误解了 Cassandra 中批处理的目的。

实际上它们是为了原子性而不是 运行 多个查询 "faster".

可以在这里找到很好的解释:

https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/

您遇到的错误不是因为 table 的大小,而是因为批处理中的查询数量。在任何给定批次中,您最多可以同时 运行 100 个查询。

与此同时,您几乎 99% 的人都使用了不太理想的方法,因为您真的不想在一个批次中进行如此多的查询。正如 Thiago 所建议的那样,批处理旨在保证原子性,而不是优化性能。

如果您想简单地进行并行查询,只需使用 Future.sequence,它将使用 fork join pool 类的方法来并行化操作。

错误来自 Cassandra,而不是来自 phantom。无论您在客户端使用哪种方法,批量大小都是有上限的。

// Assuming you have a list of queries:
val execution = Future.sequence(queries map (_.future())

希望对您有所帮助!

更新

假设您有一个交易清单。

val list: List[Transaction] = ..
// all you need is
Future.sequence(list.map(tr => database.transactionTable.insertNew(tr))

这将产生一个未来,它将在所有基础期货完成时完成,有效地为您提供 return 类型:来自原始 List[Future[ResultSet]].

Future[List[ResultSet]]

正如其他人所说,您的第一条错误消息来自非常大的 BATCH 语句。 BATCH 语句不是为您在传统关系数据库中想到的批量插入而设计的。 BATCH 语句仅在跨多个非规范化表自动插入数据或使用 UNLOGGED BATCH 在同一分区键下插入数据时才有用。

Batch statements should NOT be used as an optimization technique,因为它们不是为提高速度而设计的,实际上会影响您的表现。

最后,这是一条错误消息,因为 Cassandra 客户端驱动程序正在尝试保护集群免受可能(并且将会)关闭集群中的节点的非常大的 BATCH 语句的影响。

其次,您指出 运行 a SELECT count(*) FROM table; 给您警告:

Aggregation query used without partition key

使用 count(*) 没有指定分区键是 antipattern。出于与上述类似的原因,它会对集群的稳定性产生负面影响。

最后,我怀疑您的 Cassandra DSL 库(不熟悉 Phantom-DSL)中的某处正在执行您不期望的 BATCH,或者您可能有意识地使用 BATCH 而没有完全理解它的适当用法。我知道在 spring-data 中,当您插入项目列表(这是一个可怕的反模式)时,他们会使用 BATCH,这可能会导致类似的错误。