在 hdfs 上将 csv 文件转换为另一种 csv 格式
Convert csv files to another csv format on hdfs
我必须在 hadoop 集群上实现一个 CSV 文件转换器 运行。主要线路是:
- 我在 hdfs 上有一堆 csv 文件,内容是任意的。
- 我知道如何使用 java 代码将它们转换为 "standard" 一个(即具有指定的行)。
- 转换需要一些参数(大约 10 或 15),每个文件不同。
- 我不介意分割输出文件。
- 但我希望它们有一个
input-filename[##].csv
名称以便以后区分它们 processing/visualization。
我的问题是:最好的方法是什么?
作为 hadoop 的新手,我正在考虑使用 map reduce 来执行此操作,但我对输出格式有疑问。另一方面,我可以使用 spark(在 scala 中使用我的 java 代码)。编码似乎很容易,但我不知道该怎么做。
非常感谢(更多)有经验的用户提出关于要实施的主要任务的意见。
Spark 是一个不错的选择。它还通过快速处理为您提供更大的灵活性。
用spark确实很简单:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hadoop.fs.FileUtil;
import java.io.File;
public class Converter {
static String appName = "CSV-Conversion"; // spark app name
static String master = "local"; // spark master
JavaSparkContext sc;
/**
* Init spark context
*/
public Converter(){
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
}
/**
* The conversion using spark
*/
public void convertFile(String inputFile, String outputDir){
JavaRDD<String> inputRdd = sc.textFile(inputFile);
JavaRDD<String> outputRdd = inputRdd.map(Converter::convertLine);
outputRdd.saveAsTextFile(outputDir);
}
/**
* The function that convert each file line.
*
* It is static (i.e. does not requires 'this') and does not use other object.
* If external objects (or not static method) are required, they must be
* serializable so that a copy can be send to each worker node.
* It is however better to avoid or at least minimize such data transfer.
*/
public static String convertLine(String line){
return line.toUpperCase();
}
/**
* As a stand-alone app
*/
public static void main(String[] args){
if(args.length!=2) {
System.out.println("Invalid number of arguments. Usage: Converter inputFile outputDir");
System.exit(1);
}
String inputFile = args[0];
String outputDir = args[1];
FileUtil.fullyDelete(new File(outputDir));
Converter c = new Converter();
c.convertFile(inputFile,outputDir);
}
}
我必须在 hadoop 集群上实现一个 CSV 文件转换器 运行。主要线路是:
- 我在 hdfs 上有一堆 csv 文件,内容是任意的。
- 我知道如何使用 java 代码将它们转换为 "standard" 一个(即具有指定的行)。
- 转换需要一些参数(大约 10 或 15),每个文件不同。
- 我不介意分割输出文件。
- 但我希望它们有一个
input-filename[##].csv
名称以便以后区分它们 processing/visualization。
我的问题是:最好的方法是什么?
作为 hadoop 的新手,我正在考虑使用 map reduce 来执行此操作,但我对输出格式有疑问。另一方面,我可以使用 spark(在 scala 中使用我的 java 代码)。编码似乎很容易,但我不知道该怎么做。
非常感谢(更多)有经验的用户提出关于要实施的主要任务的意见。
Spark 是一个不错的选择。它还通过快速处理为您提供更大的灵活性。
用spark确实很简单:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hadoop.fs.FileUtil;
import java.io.File;
public class Converter {
static String appName = "CSV-Conversion"; // spark app name
static String master = "local"; // spark master
JavaSparkContext sc;
/**
* Init spark context
*/
public Converter(){
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
}
/**
* The conversion using spark
*/
public void convertFile(String inputFile, String outputDir){
JavaRDD<String> inputRdd = sc.textFile(inputFile);
JavaRDD<String> outputRdd = inputRdd.map(Converter::convertLine);
outputRdd.saveAsTextFile(outputDir);
}
/**
* The function that convert each file line.
*
* It is static (i.e. does not requires 'this') and does not use other object.
* If external objects (or not static method) are required, they must be
* serializable so that a copy can be send to each worker node.
* It is however better to avoid or at least minimize such data transfer.
*/
public static String convertLine(String line){
return line.toUpperCase();
}
/**
* As a stand-alone app
*/
public static void main(String[] args){
if(args.length!=2) {
System.out.println("Invalid number of arguments. Usage: Converter inputFile outputDir");
System.exit(1);
}
String inputFile = args[0];
String outputDir = args[1];
FileUtil.fullyDelete(new File(outputDir));
Converter c = new Converter();
c.convertFile(inputFile,outputDir);
}
}