如何每几行加载几行csv

How to load csv couple of lines per couple of lines

我正在将 Spark 连接到 Cassandra,并且能够使用传统的 COPY 方法打印 CSV 的行。但是,如果 CSV 非常大,就像大数据中通常发生的那样,如何才能每行加载几行 CSV 文件以避免冻结相关问题等?

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SparkCassandra {

  def main(args: Array[String]): Unit = {

      val conf = new SparkConf().setAppName("SparkCassandra").setMaster("local").set("spark.cassandra.connection.host", "localhost")
      val sc = new SparkContext(conf)
      val my_rdd = sc.cassandraTable("my_keyspace", "my_csv")
      my_rdd.take(20).foreach(println)
      sc.stop()
  }
}

应该使用时间变量还是类似的东西?

如果您只想将数据加载到 Cassandra 或使用命令行从 Cassandra 卸载数据,我建议您查看 DataStax Bulk Loader (DSBulk) - 它针对加载数据进行了高度优化 to/from Cassandra/DSE。它适用于开源 Cassandra 和 DSE。

在最简单的情况下,从 table 加载和卸载将如下所示(默认格式为 CSV):

dsbulk load -k keyspace -t table -url my_file.csv
dsbulk unload -k keyspace -t table -url my_file.csv

对于更复杂的情况,您可能需要提供更多选项。您可以在 following series of the blog posts.

中找到更多信息

如果你想用 Spark 做到这一点,那么我建议使用 Dataframe API 而不是 RDD。在这种情况下,您只需使用标准 read & write 函数。

将数据从 Cassandra 导出到 CSV:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "ks").load()
data.write.format("csv").save("my_file.csv")

或从 CSV 中读取并存储在 Cassandra 中:

import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SaveMode
val data = spark.read.format("csv").save("my_file.csv")
data.cassandraFormat("tbl", "ks").mode(SaveMode.Append).save()