C* 迁移- 将 1B+ 行 table 数据移动到新模式 table
C* migrations- move 1B+ rows table data to new schema table
我使用 DSE 4.7 datastax-enterprise、C* 2.1.5、spark 1.2.1,需要将数据从一个大 table 迁移到具有不同模式的新空 table以及需要从大 table.
中的切除列之一生成的附加列
我知道可以通过 spark 将 table 数据迁移到另一个具有新模式的 table 或通过复制命令到 cqlsh,但我对 工具 感兴趣,它可以为我提供未来迁移的长期解决方案以及管理和规划迁移的更多选项。
我认为这是一个普遍的问题,我没有找到任何可靠的解决方案。
有什么想法吗?
我一直坚信 Spark 是完成这项工作的最佳工具。
我已经测试了下面的代码,结果很好。
import java.sql.Date
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import java.sql._
import com.github.nscala_time.time.Imports._
object Migration {
def main(args: scala.Array[String]) {
def changeDate(created: java.util.Date) : String = {
var sDate = new DateTime(created)
var sDay = sDate.getDayOfMonth()
var sMonth = sDate.getMonthOfYear()
var sYear = sDate.getYear()
var created_date = "" + sYear + "-" + sMonth + "-" + sDay
created_date //return
}
//spark configuration
val conf = new SparkConf().setAppName("migration")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val connector = CassandraConnector(conf)
val rdd = sc.cassandraTable("keyspace", "table_a")
println("Starting migration...")
rdd.map(row => {
val x = new java.util.Date(row.getLong("x"))
val y = new java.util.Date(row.getLong("y"))
val z = row.getString("z")
val t = row.getString("t")
val k = changeDate(x)
connector.withSessionDo(session => {
val statement = session.prepare(s"INSERT INTO keyspace.table_b (k, y, z, x, t) " + "values (?, ?, ?, ?, ?)")
val bound = statement.bind(k, y, z, x, t)
session.executeAsync(bound)
})
}).foreach(x => x.getUninterruptibly())
println("Done.")
} }
我使用 DSE 4.7 datastax-enterprise、C* 2.1.5、spark 1.2.1,需要将数据从一个大 table 迁移到具有不同模式的新空 table以及需要从大 table.
中的切除列之一生成的附加列我知道可以通过 spark 将 table 数据迁移到另一个具有新模式的 table 或通过复制命令到 cqlsh,但我对 工具 感兴趣,它可以为我提供未来迁移的长期解决方案以及管理和规划迁移的更多选项。
我认为这是一个普遍的问题,我没有找到任何可靠的解决方案。
有什么想法吗?
我一直坚信 Spark 是完成这项工作的最佳工具。 我已经测试了下面的代码,结果很好。
import java.sql.Date
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import java.sql._
import com.github.nscala_time.time.Imports._
object Migration {
def main(args: scala.Array[String]) {
def changeDate(created: java.util.Date) : String = {
var sDate = new DateTime(created)
var sDay = sDate.getDayOfMonth()
var sMonth = sDate.getMonthOfYear()
var sYear = sDate.getYear()
var created_date = "" + sYear + "-" + sMonth + "-" + sDay
created_date //return
}
//spark configuration
val conf = new SparkConf().setAppName("migration")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val connector = CassandraConnector(conf)
val rdd = sc.cassandraTable("keyspace", "table_a")
println("Starting migration...")
rdd.map(row => {
val x = new java.util.Date(row.getLong("x"))
val y = new java.util.Date(row.getLong("y"))
val z = row.getString("z")
val t = row.getString("t")
val k = changeDate(x)
connector.withSessionDo(session => {
val statement = session.prepare(s"INSERT INTO keyspace.table_b (k, y, z, x, t) " + "values (?, ?, ?, ?, ?)")
val bound = statement.bind(k, y, z, x, t)
session.executeAsync(bound)
})
}).foreach(x => x.getUninterruptibly())
println("Done.")
} }