Junit 无法使用 Spark Structured Streaming 创建的文件删除 @TempDir
Junit cannot delete @TempDir with file created by Spark Structured Streaming
我为我的管道创建了一个集成测试来检查是否生成了正确的 CSV 文件:
class CsvBatchSinkTest {
@RegisterExtension
static SparkExtension spark = new SparkExtension();
@TempDir
static Path directory;
//this checks if the file is already available
static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
}
//this gets content of file
static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
return Files.readAllLines(
Files.walk(file.toPath())
.filter(f -> f.toString().endsWith(suffix))
.findFirst()
.orElseThrow(AssertionException::new));
}
@Test
@DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
void testWrite() throws IOException, InterruptedException {
File file = new File(directory.toFile(), "output");
List<Row> data =
asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));
Dataset<Row> dataset =
spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);
dataset.coalesce(1)
.write()
.option("header", "true")
.option("delimiter", ";")
.csv(file.getAbsolutePath());
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> isFileWithSuffixAvailable(file, ".csv"));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() ->
assertThat(extractFileWithSuffixContent(file, ".csv"))
.containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
}
}
真实代码看起来有点不同,这只是一个可重现的例子。
Spark 扩展只是在每次测试之前启动本地 spark,并在之后关闭。
测试通过,但是当 junit 尝试清理时 @TempDir
抛出以下异常:
Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted
我能以某种方式修复这个错误吗?我试着等待 spark 停止使用 awaility
,但我并没有真正帮助。
也许我可以忽略这个错误?
快速猜测:您需要关闭 Files.walk 返回的流。引自文档:
If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close
method is invoked after the stream operations are completed.
要解决此问题,请在 isFileWithSuffixAvailable
方法中添加一个 try-with-resources:
static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
try (Stream<Path> walk = Files.walk(directory.toPath())) {
return walk.anyMatch(f -> f.toString().endsWith(suffix));
}
}
我为我的管道创建了一个集成测试来检查是否生成了正确的 CSV 文件:
class CsvBatchSinkTest {
@RegisterExtension
static SparkExtension spark = new SparkExtension();
@TempDir
static Path directory;
//this checks if the file is already available
static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
}
//this gets content of file
static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
return Files.readAllLines(
Files.walk(file.toPath())
.filter(f -> f.toString().endsWith(suffix))
.findFirst()
.orElseThrow(AssertionException::new));
}
@Test
@DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
void testWrite() throws IOException, InterruptedException {
File file = new File(directory.toFile(), "output");
List<Row> data =
asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));
Dataset<Row> dataset =
spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);
dataset.coalesce(1)
.write()
.option("header", "true")
.option("delimiter", ";")
.csv(file.getAbsolutePath());
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> isFileWithSuffixAvailable(file, ".csv"));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() ->
assertThat(extractFileWithSuffixContent(file, ".csv"))
.containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
}
}
真实代码看起来有点不同,这只是一个可重现的例子。
Spark 扩展只是在每次测试之前启动本地 spark,并在之后关闭。
测试通过,但是当 junit 尝试清理时 @TempDir
抛出以下异常:
Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted
我能以某种方式修复这个错误吗?我试着等待 spark 停止使用 awaility
,但我并没有真正帮助。
也许我可以忽略这个错误?
快速猜测:您需要关闭 Files.walk 返回的流。引自文档:
If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's
close
method is invoked after the stream operations are completed.
要解决此问题,请在 isFileWithSuffixAvailable
方法中添加一个 try-with-resources:
static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
try (Stream<Path> walk = Files.walk(directory.toPath())) {
return walk.anyMatch(f -> f.toString().endsWith(suffix));
}
}