当 spark-csv 不会将 DataFrame 保存到文件时,是否有解释?

Is there an explanation when spark-csv won't save a DataFrame to file?

dataFrame.coalesce(1).write().save("path") 有时只写入 _SUCCESS 和 ._SUCCESS.crc 文件而没有预期的 *.csv.gz 即使在非空输入 DataFrame

文件保存密码:

private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
    dataFrame.coalesce(1)
            .write()
            .format("csv")
            .option("header", "true")
            .option("delimiter", "\t")
            .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
            .mode(SaveMode.Overwrite)
            .save("file:///" + directory);
}

文件获取代码:

static Path getTemporaryCsvFile(Path directory) throws IOException {
    String glob = "*.csv.gz";
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
        return stream.iterator().next();
    } catch (NoSuchElementException e) {
        throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
    }
}

文件获取错误示例:

java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
    /tmp/temp5889805853850415940/_SUCCESS, 
    /tmp/temp5889805853850415940/._SUCCESS.crc

我依赖于这种期望,谁能解释一下为什么会这样?

Output file should (must by logic) contain at least the header line and some data lines. But he does not exist at all

此评论有点误导。根据 Github 上的代码,只有当 Dataframe 为空时才会发生这种情况,并且不会生成 SUCCESS 文件。考虑到这些文件存在 - Dataframe 不为空并且代码中的 writeCsvToDirectory 被触发。

我有几个问题:

  • 您的 Spark 作业是否顺利完成?
  • SUCCESS 文件的时间戳是否更新?

我的两个主要嫌疑人是:

  1. coalesce(1) - 如果你有很多数据,这可能会失败
  2. SaveMode.Overwrite - 我感觉那些 SUCCESS 文件在之前运行的那个文件夹中

我自己的解决方案解决了这个问题。

我把.save("file://"换成了hadoopFileSystem.copyToLocalFile

问题是 .save("file:// 只能与 SparkSession.builder().master("local") 一起工作,其中 hdfs:// 由大师的 file:// 模拟。

理论上我可能是错的,但它确实有效。

static Path writeCsvToTemporaryDirectory(Dataset<Row> dataFrame) throws IOException {
    String temporaryDirectoryName = getTemporaryDirectoryName();

    writeCsvToDirectory(dataFrame, temporaryDirectoryName, sparkContext);

    return Paths.get(temporaryDirectoryName);
}

static void writeCsvToDirectory(Dataset<Row> dataFrame, String directory) throws IOException {
    dataFrame.coalesce(1)
        .write()
        .option("header", "true")
        .option("delimiter", "\t")
        .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
        .mode(SaveMode.Overwrite)
        .csv(directory);

    FileSystem hadoopFileSystem = FileSystem.get(sparkContext.hadoopConfiguration());

    hadoopFileSystem.copyToLocalFile(true,
        new org.apache.hadoop.fs.Path(directory),
        new org.apache.hadoop.fs.Path(directory));
}

static Path getTemporaryCsvFile(Path directory) throws IOException {
    String glob = "*.csv.gz";

    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
        return stream.iterator().next();
    } catch (NoSuchElementException e) {
        throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
    }
}

Path temporaryDirectory = writeCsvToTemporaryDirectory(dataFrame);
Path temporaryFile = DataFrameIOUtils.getTemporaryCsvFile(temporaryDirectory);

try {
    return otherStorage.upload(temporaryFile, name, fields).join();
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    removeTemporaryDirectory(temporaryDirectory);
}

您选择写入 csv 文件取决于您的存储。 如果你在 hdfs 上写,一切都很好。但是无论何时您决定在本地文件系统中写入,您都必须注意 驱动程序本地文件系统 中不会写入任何内容,并且您的数据将在工作人员的文件系统中,您应该在工作人员的文件系统中找到它们存储。

两种解法:

  1. 运行 本地模式下的 Spark

设置 mater local[NUMBER_OF_CORES] 你可以通过 --master local[10] config

提交你的作业
  1. 写入分布式文件系统

将数据写入分布式文件系统,如 s3、hdfs、...