Sparksession 读取多个文件而不是使用模式

Spark Session read mulitple files instead of using pattern

我正在尝试使用 SparkSession 从 HDFS 上的文件夹中读取几个 CSV 文件(即我不想读取文件夹中的所有文件)

我在 运行(最后的代码)时收到以下错误:

Path does not exist:
file:/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv,
/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv

我不想在阅读时使用模式,例如 /home/temp/*.csv,原因是将来我有逻辑从 100 个 CSV 文件中只选择文件夹中的一两个文件

请指教

    SparkSession sparkSession = SparkSession
            .builder()
            .appName(SparkCSVProcessors.class.getName())
            .master(master).getOrCreate();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");

    Set<String> fileSet = Files.list(Paths.get("/home/cloudera/works/JavaKafkaSparkStream/input/"))
            .filter(name -> name.toString().endsWith(".csv"))
            .map(name -> name.toString())
            .collect(Collectors.toSet());

    SQLContext sqlCtx = sparkSession.sqlContext();

    Dataset<Row> rawDataset = sparkSession.read()
            .option("inferSchema", "true")
            .option("header", "true")
            .format("com.databricks.spark.csv")
            .option("delimiter", ",")
            //.load(String.join(" , ", fileSet));
            .load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, " +
                    "/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");

更新

我可以迭代文件并进行合并,如下所示。有没有更好的方法请推荐...

    Dataset<Row> unifiedDataset = null;

    for (String fileName : fileSet) {
        Dataset<Row> tempDataset = sparkSession.read()
                .option("inferSchema", "true")
                .option("header", "true")
                .format("csv")
                .option("delimiter", ",")
                .load(fileName);
        if (unifiedDataset != null) {
            unifiedDataset= unifiedDataset.unionAll(tempDataset);
        } else {
            unifiedDataset = tempDataset;
        }
    }

您的问题是您正在创建一个值为以下的字符串:

"/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv, /home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv"

而不是将两个文件名作为参数传递,这应该通过以下方式完成:

.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv",
"/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv");

逗号 必须在字符串之外,您应该有两个值,而不是一个字符串。

根据我的理解,您想从 HDFS 读取多个文件而不使用像“/path/*.csv”这样的正则表达式。你缺少的是每条路径都需要用引号分开并用“,”

分隔

您可以使用下面的代码阅读,确保您已经添加了 SPARK CSV 库:

sqlContext.read.format("csv").load("/home/cloudera/works/JavaKafkaSparkStream/input/input_1.csv","/home/cloudera/works/JavaKafkaSparkStream/input/input_2.csv")

模式也很有用。 您希望同时 select 两个文件。 如果它们是连续的,那么你可以做类似

的事情
.load("/home/cloudera/works/JavaKafkaSparkStream/input/input_[1-2].csv")

如果有更多文件,那么只需执行 input_[1-5].csv