格式(删除 class/parens)Spark CSV saveAsTextFile 输出?
Format (remove class/parens) Spark CSV saveAsTextFile output?
我正在尝试从通过 saveAsTextFile 保存的 CSV 数据中去除换行 class 或数组文本,而无需非 Spark post 处理步骤。
我在大文件中有一些 TSV 数据,我将其提供给 RDD。
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1)))
testRdd.saveAsTextFile("test")
这将保存由 class 名称包装的数据:
head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")
我也试过将它转化为未命名的 class (?) 而不是 class.
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1)))
testRdd.saveAsTextFile("test2")
这会产生
("1969720fb3100608b38297aad8b3be93","active")
仍然需要 post 处理以移除包装括号。
为了去除包装字符,我尝试了 flatMap(),但 RDD 显然不是正确的类型:
testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
found : ((String, String)) => (String, String)
required: ((String, String)) => TraversableOnce[?]
testRdd.flatMap(identity).saveAsTextFile("test3")
那么...我是否需要将 RDD 转换为其他类型的 RDD,或者是否有另一种方法可以将 RDD 保存为 CSV 以便剥离环绕文本?
谢谢!
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => x(0)+","+x(1))
这会将输出写为 csv
您可以尝试以下方法:
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
.map(x => x.toLowerCase.split('\t'))
.map(x => x(0)+","+x(1))
我们听到的是在过滤您的 header 之后,您可以在同一映射段落中将字符串小写,从而节省一些不必要的额外映射。
这将创建一个 RDD[String],您可以将其另存为 CSV 格式。
PS:
保存的rdd输出的扩展名不是csv但是格式是!
这不是最佳且唯一的解决方案,但它会为您完成工作!
你可以看看 Spark CSV Library。
val logFile = "/input.csv"
val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(master="local", appName="Mi app", conf)
val logData = sc.textFile(logFile, 2).cache()
val lower = logData.map(line => line.toLowerCase)
我正在尝试从通过 saveAsTextFile 保存的 CSV 数据中去除换行 class 或数组文本,而无需非 Spark post 处理步骤。
我在大文件中有一些 TSV 数据,我将其提供给 RDD。
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1)))
testRdd.saveAsTextFile("test")
这将保存由 class 名称包装的数据:
head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")
我也试过将它转化为未命名的 class (?) 而不是 class.
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1)))
testRdd.saveAsTextFile("test2")
这会产生
("1969720fb3100608b38297aad8b3be93","active")
仍然需要 post 处理以移除包装括号。
为了去除包装字符,我尝试了 flatMap(),但 RDD 显然不是正确的类型:
testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
found : ((String, String)) => (String, String)
required: ((String, String)) => TraversableOnce[?]
testRdd.flatMap(identity).saveAsTextFile("test3")
那么...我是否需要将 RDD 转换为其他类型的 RDD,或者是否有另一种方法可以将 RDD 保存为 CSV 以便剥离环绕文本?
谢谢!
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => x(0)+","+x(1))
这会将输出写为 csv
您可以尝试以下方法:
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
.map(x => x.toLowerCase.split('\t'))
.map(x => x(0)+","+x(1))
我们听到的是在过滤您的 header 之后,您可以在同一映射段落中将字符串小写,从而节省一些不必要的额外映射。
这将创建一个 RDD[String],您可以将其另存为 CSV 格式。
PS:
保存的rdd输出的扩展名不是csv但是格式是!
这不是最佳且唯一的解决方案,但它会为您完成工作!
你可以看看 Spark CSV Library。
val logFile = "/input.csv"
val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(master="local", appName="Mi app", conf)
val logData = sc.textFile(logFile, 2).cache()
val lower = logData.map(line => line.toLowerCase)