Java IO:正在读取仍在写入的文件
Java IO: Reading a file that is still being written
我正在创建一个需要从仍在写入的文件中读取的程序。
主要问题是:如果读取和写入将使用 InputStream
和 OutputStream
类 运行 在单独的线程上执行,为了防止数据损坏,我需要注意哪些问题和边缘情况?
万一有人想知道我是否考虑过其他非 InputStream
方法,答案是肯定的,我有但不幸的是在这个项目中这是不可能的,因为该程序使用的库只能与InputStream
和 OutputStream
.
另外,一些读者问为什么需要这种并发症。文件写完了为什么不读?
原因是效率。该程序将执行以下操作
- 下载一系列字节块,每个块大小为 1.5MB。该程序将接收数千个这样的块,总计可达 30GB。此外,同时下载块是为了最大化带宽,因此它们 可能会乱序到达 。
- 程序将在每个块到达后立即发送它们进行处理。请注意,它们将按顺序送去处理。如果块 m 在块 m-1 之前到达,它们将在磁盘上缓冲,直到块 m-1 到达并发送以进行处理。
- 从块 0 到块 n 执行这些块的处理,直到处理完每个块
- 重新发送处理结果。
如果我们要等待整个文件传输完毕,这会给本应是实时系统的系统带来巨大的延迟。
使用 RandomAccessFile。通过 getChannel 或类似的可以使用 ByteBuffer.
您将无法 "insert" 或 "delete" 文件的中间部分。对于这样的目的,你原来的方法会很好,但使用两个文件。
对于并发:要保持同步,您可以维护文件的一个单一对象模型,并在那里进行更改。只有挂起的更改需要保留在内存中,其他分层数据可以根据需要重新读取和重新解析。
您应该使用 PipedInputStream 和 PipedOutputStream:
static Thread newCopyThread(InputStream is, OutputStream os) {
Thread t = new Thread() {
@Override
public void run() {
byte[] buffer = new byte[2048];
try {
while (true) {
int size = is.read(buffer);
if (size < 0) break;
os.write(buffer, 0, size);
}
is.close();
os.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
};
return t;
}
public void main(String[] args) throws IOException, InterruptedException {
ByteArrayInputStream bi = new ByteArrayInputStream("abcdefg".getBytes());
PipedInputStream is = new PipedInputStream();
PipedOutputStream os = new PipedOutputStream(is);
Thread p = newCopyThread(bi, os);
Thread c = newCopyThread(is, System.out);
p.start();
c.start();
p.join();
c.join();
}
所以你的问题(正如你现在已经解决的那样)是在块#1 到达之前你不能开始处理,你需要缓冲每个块#N(N > 1)直到你可以处理它们。
我会将每个块写入它们自己的文件并创建一个自定义 InputStream
来按顺序读取每个块。下载 chunkfile 时会被命名为 chunk.1.downloading
,当整个 chunk 被加载时它会被重命名为 chunk.1
.
自定义 InputStream
将检查文件 chunk.N
是否存在(其中 N = 1...X)。如果没有,它会阻塞。每次完全下载一个块时,都会通知 InputStream
,它将检查下载的块是否是下一个要处理的块。如果是,则正常读取,否则再次阻塞。
我正在创建一个需要从仍在写入的文件中读取的程序。
主要问题是:如果读取和写入将使用 InputStream
和 OutputStream
类 运行 在单独的线程上执行,为了防止数据损坏,我需要注意哪些问题和边缘情况?
万一有人想知道我是否考虑过其他非 InputStream
方法,答案是肯定的,我有但不幸的是在这个项目中这是不可能的,因为该程序使用的库只能与InputStream
和 OutputStream
.
另外,一些读者问为什么需要这种并发症。文件写完了为什么不读?
原因是效率。该程序将执行以下操作
- 下载一系列字节块,每个块大小为 1.5MB。该程序将接收数千个这样的块,总计可达 30GB。此外,同时下载块是为了最大化带宽,因此它们 可能会乱序到达 。
- 程序将在每个块到达后立即发送它们进行处理。请注意,它们将按顺序送去处理。如果块 m 在块 m-1 之前到达,它们将在磁盘上缓冲,直到块 m-1 到达并发送以进行处理。
- 从块 0 到块 n 执行这些块的处理,直到处理完每个块
- 重新发送处理结果。
如果我们要等待整个文件传输完毕,这会给本应是实时系统的系统带来巨大的延迟。
使用 RandomAccessFile。通过 getChannel 或类似的可以使用 ByteBuffer.
您将无法 "insert" 或 "delete" 文件的中间部分。对于这样的目的,你原来的方法会很好,但使用两个文件。
对于并发:要保持同步,您可以维护文件的一个单一对象模型,并在那里进行更改。只有挂起的更改需要保留在内存中,其他分层数据可以根据需要重新读取和重新解析。
您应该使用 PipedInputStream 和 PipedOutputStream:
static Thread newCopyThread(InputStream is, OutputStream os) {
Thread t = new Thread() {
@Override
public void run() {
byte[] buffer = new byte[2048];
try {
while (true) {
int size = is.read(buffer);
if (size < 0) break;
os.write(buffer, 0, size);
}
is.close();
os.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
};
return t;
}
public void main(String[] args) throws IOException, InterruptedException {
ByteArrayInputStream bi = new ByteArrayInputStream("abcdefg".getBytes());
PipedInputStream is = new PipedInputStream();
PipedOutputStream os = new PipedOutputStream(is);
Thread p = newCopyThread(bi, os);
Thread c = newCopyThread(is, System.out);
p.start();
c.start();
p.join();
c.join();
}
所以你的问题(正如你现在已经解决的那样)是在块#1 到达之前你不能开始处理,你需要缓冲每个块#N(N > 1)直到你可以处理它们。
我会将每个块写入它们自己的文件并创建一个自定义 InputStream
来按顺序读取每个块。下载 chunkfile 时会被命名为 chunk.1.downloading
,当整个 chunk 被加载时它会被重命名为 chunk.1
.
自定义 InputStream
将检查文件 chunk.N
是否存在(其中 N = 1...X)。如果没有,它会阻塞。每次完全下载一个块时,都会通知 InputStream
,它将检查下载的块是否是下一个要处理的块。如果是,则正常读取,否则再次阻塞。