格式(删除 class/parens)Spark CSV saveAsTextFile 输出?

Format (remove class/parens) Spark CSV saveAsTextFile output?

我正在尝试从通过 saveAsTextFile 保存的 CSV 数据中去除换行 class 或数组文本,而无需非 Spark post 处理步骤。

我在大文件中有一些 TSV 数据,我将其提供给 RDD。

 val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1)))

testRdd.saveAsTextFile("test")

这将保存由 class 名称包装的数据:

head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")

我也试过将它转化为未命名的 class (?) 而不是 class.

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1)))

testRdd.saveAsTextFile("test2")

这会产生

("1969720fb3100608b38297aad8b3be93","active")

仍然需要 post 处理以移除包装括号。

为了去除包装字符,我尝试了 flatMap(),但 RDD 显然不是正确的类型:

testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
 found   : ((String, String)) => (String, String)
 required: ((String, String)) => TraversableOnce[?]
              testRdd.flatMap(identity).saveAsTextFile("test3")

那么...我是否需要将 RDD 转换为其他类型的 RDD,或者是否有另一种方法可以将 RDD 保存为 CSV 以便剥离环绕文本?

谢谢!

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => x(0)+","+x(1))

这会将输出写为 csv

您可以尝试以下方法:

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
                                 .map(x => x.toLowerCase.split('\t'))
                                 .map(x => x(0)+","+x(1))

我们听到的是在过滤您的 header 之后,您可以在同一映射段落中将字符串小写,从而节省一些不必要的额外映射。

这将创建一个 RDD[String],您可以将其另存为 CSV 格式。

PS:

  • 保存的rdd输出的扩展名不是csv但是格式是!

  • 这不是最佳且唯一的解决方案,但它会为您完成工作!

你可以看看 Spark CSV Library

val logFile = "/input.csv"

val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "true")

val sc = new SparkContext(master="local", appName="Mi app", conf)

val logData = sc.textFile(logFile, 2).cache()

val lower = logData.map(line => line.toLowerCase)