使用 Scala / Phantom-DSL 异步读取大型 Cassandra table
Async reading of large Cassandra table using Scala / Phantom-DSL
我在读取包含 >800k 行的 table 时遇到问题。我需要从上到下阅读行以便处理它们。
我为此目的使用 Scala 和 Phantom。
这是我的 table 样子。
CREATE TABLE raw (
id uuid PRIMARY KEY,
b1 text,
b2 timestamp,
b3 text,
b4 text,
b5 text
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
到目前为止,我已尝试使用以下方法阅读 table:
def getAllRecords : Future[Seq[row]] = select.fetch
或更花哨的 Play Enumerator 并将其与 Iteratee 组合
def getAllRecords : Enumerator = select.fetchEnumrator
这一切都不起作用,似乎 cassandra/driver/my 程序总是试图预先读取所有记录,我在这里缺少什么?
您是否尝试过在更大的读取测试中检查代码?
class IterateeBigReadPerformanceTest extends BigTest with ScalaFutures {
it should "read the correct number of records found in the table" in {
val counter: AtomicLong = new AtomicLong(0)
val result = TestDatabase.primitivesJoda.select
.fetchEnumerator run Iteratee.forEach {
r => counter.incrementAndGet()
}
result.successful {
query => {
info(s"done, reading: ${counter.get}")
counter.get() shouldEqual 2000000
}
}
}
}
这不会预先读取您的记录。事实上,我们进行了超过 运行 一个多小时的测试,以保证足够的 GC 暂停、没有 GC 开销、permgen/metaspace 压力保持在范围内等。
如果确实有任何改变,那只是错误,但这应该仍然有效。
我在读取包含 >800k 行的 table 时遇到问题。我需要从上到下阅读行以便处理它们。
我为此目的使用 Scala 和 Phantom。
这是我的 table 样子。
CREATE TABLE raw (
id uuid PRIMARY KEY,
b1 text,
b2 timestamp,
b3 text,
b4 text,
b5 text
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
到目前为止,我已尝试使用以下方法阅读 table:
def getAllRecords : Future[Seq[row]] = select.fetch
或更花哨的 Play Enumerator 并将其与 Iteratee 组合
def getAllRecords : Enumerator = select.fetchEnumrator
这一切都不起作用,似乎 cassandra/driver/my 程序总是试图预先读取所有记录,我在这里缺少什么?
您是否尝试过在更大的读取测试中检查代码?
class IterateeBigReadPerformanceTest extends BigTest with ScalaFutures {
it should "read the correct number of records found in the table" in {
val counter: AtomicLong = new AtomicLong(0)
val result = TestDatabase.primitivesJoda.select
.fetchEnumerator run Iteratee.forEach {
r => counter.incrementAndGet()
}
result.successful {
query => {
info(s"done, reading: ${counter.get}")
counter.get() shouldEqual 2000000
}
}
}
}
这不会预先读取您的记录。事实上,我们进行了超过 运行 一个多小时的测试,以保证足够的 GC 暂停、没有 GC 开销、permgen/metaspace 压力保持在范围内等。
如果确实有任何改变,那只是错误,但这应该仍然有效。