Cassandra 大 table 迁移瓶颈
Cassandra big table migration bottleneck
我正在尝试将 cassandra 中的大 table 迁移到新的空 table(具有不同的主键),在相同的键空间和集群中,使用 spark 1.2.1:
val rdd_table_a = sc.cassandraTable("keyspace", "table_a").filter(row => row.getLong("a") >= start_a && row.getLong("a") <= end_a)
rdd_table_a.map(row => {
val a = row.getLong("a")
val b = row.getLong("b")
val c = row.getString("c")
val d = row.getString("d")
val new_a = generateSomeNewValue(a)
connector.withSessionDo(session => {
val statement = session.prepare(s"INSERT INTO keyspace.table_b (new_a, c, b, a, d) " + "values (?, ?, ?, ?, ?)")
val bound = statement.bind(new_a, c, b, a, d)
session.executeAsync(bound)
})
}).foreach(x => x.getUninterruptibly())
table 有超过 1B 行,即使我试图处理它的一小部分,它也需要 7 个小时以上。
我在文档中搜索但没有找到 - connector.withSessionDo 在每个循环迭代中打开另一个会话吗?
以上代码片段中的瓶颈可能是什么?
conn.withSessionDo
使用与 Cassandra 的当前共享连接执行自定义 CQL 查询:
Allows to use Cassandra Session
in a safe way without risk of forgetting to close it. The Session
object obtained through this method is a proxy to a shared, single Session
associated with the cluster.
Internally, the shared underlying Session
will be closed shortly after all the proxies are closed.
您可以使用更典型的 saveToCassandra
方法重写您的代码。
就我使用 Spark+Cassandra 的个人经验而言,此类查询最慢的一点是 Cassandra 本身:对大表的数据扫描非常慢(与 Parquet 相比)。
我正在尝试将 cassandra 中的大 table 迁移到新的空 table(具有不同的主键),在相同的键空间和集群中,使用 spark 1.2.1:
val rdd_table_a = sc.cassandraTable("keyspace", "table_a").filter(row => row.getLong("a") >= start_a && row.getLong("a") <= end_a)
rdd_table_a.map(row => {
val a = row.getLong("a")
val b = row.getLong("b")
val c = row.getString("c")
val d = row.getString("d")
val new_a = generateSomeNewValue(a)
connector.withSessionDo(session => {
val statement = session.prepare(s"INSERT INTO keyspace.table_b (new_a, c, b, a, d) " + "values (?, ?, ?, ?, ?)")
val bound = statement.bind(new_a, c, b, a, d)
session.executeAsync(bound)
})
}).foreach(x => x.getUninterruptibly())
table 有超过 1B 行,即使我试图处理它的一小部分,它也需要 7 个小时以上。 我在文档中搜索但没有找到 - connector.withSessionDo 在每个循环迭代中打开另一个会话吗?
以上代码片段中的瓶颈可能是什么?
conn.withSessionDo
使用与 Cassandra 的当前共享连接执行自定义 CQL 查询:
Allows to use Cassandra
Session
in a safe way without risk of forgetting to close it. TheSession
object obtained through this method is a proxy to a shared, singleSession
associated with the cluster.Internally, the shared underlying
Session
will be closed shortly after all the proxies are closed.
您可以使用更典型的 saveToCassandra
方法重写您的代码。
就我使用 Spark+Cassandra 的个人经验而言,此类查询最慢的一点是 Cassandra 本身:对大表的数据扫描非常慢(与 Parquet 相比)。