在 Apache Spark 中最后关闭流会导致 java.io.IOException: Stream closed
Closing stream at the end in Apache Spark results in java.io.IOException: Stream closed
前段时间我一直面临 读取 Apache Spark 中的 zip 文件的问题。我分享了my answer on Whosebug。
正如@Programmer 恰当地指出的那样,我没有关闭打开的流。我尝试通过 takeWhile
(inspiration)
中的部分函数实现它
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine())
.takeWhile{
case null => br.close(); false
case _ => true
}
}
但是不行!
在读取 zip 文件时,我现在收到此错误:
Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20, localhost): java.io.IOException: Stream closed
at java.util.zip.ZipInputStream.ensureOpen(ZipInputStream.java:67)
at java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:116)
只要让它保持打开状态 - 它就可以正常工作。
看来我要关闭流然后再尝试读取它。但是我不知道为什么以及如何解决。
根据@alexandre-dupriez 的评论,我只关闭了外部 Stream
这有助于...
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
因此我需要一段时间来确认它是否正常工作。
前段时间我一直面临 读取 Apache Spark 中的 zip 文件的问题。我分享了my answer on Whosebug。
正如@Programmer 恰当地指出的那样,我没有关闭打开的流。我尝试通过 takeWhile
(inspiration)
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine())
.takeWhile{
case null => br.close(); false
case _ => true
}
}
但是不行!
在读取 zip 文件时,我现在收到此错误:
Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20, localhost): java.io.IOException: Stream closed
at java.util.zip.ZipInputStream.ensureOpen(ZipInputStream.java:67)
at java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:116)
只要让它保持打开状态 - 它就可以正常工作。
看来我要关闭流然后再尝试读取它。但是我不知道为什么以及如何解决。
根据@alexandre-dupriez 的评论,我只关闭了外部 Stream
这有助于...
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
因此我需要一段时间来确认它是否正常工作。