Cassandra 大 table 迁移瓶颈

Cassandra big table migration bottleneck

我正在尝试将 cassandra 中的大 table 迁移到新的空 table(具有不同的主键),在相同的键空间和集群中,使用 spark 1.2.1:

val rdd_table_a = sc.cassandraTable("keyspace", "table_a").filter(row => row.getLong("a") >= start_a && row.getLong("a") <= end_a)

    rdd_table_a.map(row => { 
        val a = row.getLong("a")
        val b = row.getLong("b")
        val c = row.getString("c")
        val d = row.getString("d")
        val new_a = generateSomeNewValue(a)
        connector.withSessionDo(session => {
            val statement = session.prepare(s"INSERT INTO keyspace.table_b (new_a, c, b, a, d) " + "values (?, ?, ?, ?, ?)")
            val bound = statement.bind(new_a, c, b, a, d)
            session.executeAsync(bound)
        })
    }).foreach(x => x.getUninterruptibly())

table 有超过 1B 行,即使我试图处理它的一小部分,它也需要 7 个小时以上。 我在文档中搜索但没有找到 - connector.withSessionDo 在每个循环迭代中打开另一个会话吗?

以上代码片段中的瓶颈可能是什么?

conn.withSessionDo 使用与 Cassandra 的当前共享连接执行自定义 CQL 查询:

Allows to use Cassandra Session in a safe way without risk of forgetting to close it. The Session object obtained through this method is a proxy to a shared, single Session associated with the cluster.

Internally, the shared underlying Session will be closed shortly after all the proxies are closed.

您可以使用更典型的 saveToCassandra 方法重写您的代码。

就我使用 Spark+Cassandra 的个人经验而言,此类查询最慢的一点是 Cassandra 本身:对大表的数据扫描非常慢(与 Parquet 相比)。