Sparksession 读取多个文件而不是使用模式
Spark Session read mulitple files instead of using pattern
我正在尝试使用 SparkSession
从 HDFS 上的文件夹中读取几个 CSV 文件(即我不想读取文件夹中的所有文件)
我在 运行(最后的代码)时收到以下错误:
Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv
我不想在阅读时使用模式,例如 /home/temp/*.csv
,原因是将来我有逻辑从 100 个 CSV 文件中只选择文件夹中的一两个文件
请指教
SparkSession sparkSession = SparkSession
.builder()
.appName(SparkCSVProcessors.class.getName())
.master(master).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
.filter(name -> name.toString().endsWith(".csv"))
.map(name -> name.toString())
.collect(Collectors.toSet());
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> rawDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("com.databricks.spark.csv")
.option("delimiter", ",")
//.load(String.join(" , ", fileSet));
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
更新
我可以迭代文件并进行合并,如下所示。有没有更好的方法请推荐...
Dataset<Row> unifiedDataset = null;
for (String fileName : fileSet) {
Dataset<Row> tempDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("csv")
.option("delimiter", ",")
.load(fileName);
if (unifiedDataset != null) {
unifiedDataset= unifiedDataset.unionAll(tempDataset);
} else {
unifiedDataset = tempDataset;
}
}
您的问题是您正在创建一个值为以下的字符串:
"/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"
而不是将两个文件名作为参数传递,这应该通过以下方式完成:
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv",
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
逗号 必须在字符串之外,您应该有两个值,而不是一个字符串。
根据我的理解,您想从 HDFS 读取多个文件而不使用像“/path/*.csv”这样的正则表达式。你缺少的是每条路径都需要用引号分开并用“,”
分隔
您可以使用下面的代码阅读,确保您已经添加了 SPARK CSV 库:
sqlContext.read.format("csv").load("/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv","/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv")
模式也很有用。
您希望同时 select 两个文件。
如果它们是连续的,那么你可以做类似
的事情
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_[1-2].csv")
如果有更多文件,那么只需执行 input_[1-5].csv
我正在尝试使用 SparkSession
从 HDFS 上的文件夹中读取几个 CSV 文件(即我不想读取文件夹中的所有文件)
我在 运行(最后的代码)时收到以下错误:
Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv
我不想在阅读时使用模式,例如 /home/temp/*.csv
,原因是将来我有逻辑从 100 个 CSV 文件中只选择文件夹中的一两个文件
请指教
SparkSession sparkSession = SparkSession
.builder()
.appName(SparkCSVProcessors.class.getName())
.master(master).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
.filter(name -> name.toString().endsWith(".csv"))
.map(name -> name.toString())
.collect(Collectors.toSet());
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> rawDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("com.databricks.spark.csv")
.option("delimiter", ",")
//.load(String.join(" , ", fileSet));
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
更新
我可以迭代文件并进行合并,如下所示。有没有更好的方法请推荐...
Dataset<Row> unifiedDataset = null;
for (String fileName : fileSet) {
Dataset<Row> tempDataset = sparkSession.read()
.option("inferSchema", "true")
.option("header", "true")
.format("csv")
.option("delimiter", ",")
.load(fileName);
if (unifiedDataset != null) {
unifiedDataset= unifiedDataset.unionAll(tempDataset);
} else {
unifiedDataset = tempDataset;
}
}
您的问题是您正在创建一个值为以下的字符串:
"/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, /home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"
而不是将两个文件名作为参数传递,这应该通过以下方式完成:
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv",
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");
逗号 必须在字符串之外,您应该有两个值,而不是一个字符串。
根据我的理解,您想从 HDFS 读取多个文件而不使用像“/path/*.csv”这样的正则表达式。你缺少的是每条路径都需要用引号分开并用“,”
分隔您可以使用下面的代码阅读,确保您已经添加了 SPARK CSV 库:
sqlContext.read.format("csv").load("/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv","/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv")
模式也很有用。 您希望同时 select 两个文件。 如果它们是连续的,那么你可以做类似
的事情.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_[1-2].csv")
如果有更多文件,那么只需执行 input_[1-5].csv