在同一文件上的 FileOutputStream 关闭之前,我可以使 FileInputStream.read 阻塞吗?
Can I make FileInputStream.read block until FileOutputStream on the same file is closed?
在我的应用程序中,我正在接收要存储在文件中并对其进行一些计算的数据。接收和计算都可能持续很长时间,所以我想异步进行。
下面的列表显示了我的基本设置:thread1
生成一些数据并将它们存储在一个文件中。
thread2
读取文件并处理数据。
Thread thread1 = new Thread( () -> {
try {
BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
for( int i = 0; i < 10; i++ ) {
//producing data...
out.write( ( "hello " + i + "\n" ).getBytes() );
out.flush();
//Thread.sleep( 10 );
}
out.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread1.start();
Thread thread2 = new Thread( () -> {
try {
BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
int b = in.read();
while( b != -1 ) {
//do some calculation with data
System.out.print( (char)b );
b = in.read();
}
in.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread2.start();
根据这个问题,我想在同一个文件上同时读取和写入是可以的:FileInputStream and FileOutputStream to the same file: Is a read() guaranteed to see all write()s that "happened before"? 或者我在这里遗漏了什么?
执行上面的清单会产生预期的输出:
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
但是如果 reader 线程由于某种原因比编写器快(可以通过取消注释线程 1 中的 Thread.sleep 行来模拟),reader 读取 EOF (-1) 并在文件完全写入之前完成。只放了一行:
hello 0
然而作者仍然在 "test" 文件中生成整个输出。
现在我想让 in.read()
阻塞,直到线程 1 中的 FileOutputStream 关闭。
我认为这可以通过避免将 EOF 放在文件末尾直到 out
关闭来完成。这是真的吗?如果是,我该怎么做?或者有更好的方法吗?
A reader(消费者)可以等待写入者(生产者),即使接口是一个文件。但总的来说,使用队列并遵循 producer/consumer 模式会更好。
无论如何,在这种情况下,粗略的 "wait for more input" 过程只涉及两个 Atomic
值:
- 一个用于跟踪写入的字节数 (
AtomicInteger
)
- 一个表示没有更多字节可用(
AtomicBoolean
)
原子变量可以在线程之间共享:两个线程将始终看到原子值的最新值。
然后,作者可以通过 AtomicInteger
更新写入的字节数,然后 reader 可以决定等待更多输入。
编写器还可以通过 AtomicBoolean
指示是否不再写入字节,并且 reader 可以使用该信息读取到文件末尾。
要记住的另一件事是启动线程不受您的控制:您的操作系统将决定线程何时真正启动 运行。
要同时为线程提供 运行 的合理机会,请使用 "startLatch",如下面的代码所示。
下面的演示代码是可运行的,应该可以很好地说明如何使 reader 线程等待来自编写器线程的更多输入。
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class ReadWhileWrite {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
CountDownLatch startLatch = new CountDownLatch(2);
Path testFile = Paths.get("test-read-while-write.txt");
testFile.toFile().delete();
int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes.
CountDownLatch testFileExists = new CountDownLatch(1);
AtomicInteger bytesWritten = new AtomicInteger();
AtomicBoolean writeFinished = new AtomicBoolean();
// Writer
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus reader start.");
}
try (OutputStream out = Files.newOutputStream(testFile)) {
testFileExists.countDown();
int maxLoops = 10;
IntStream.range(0, maxLoops).forEach(i -> {
byte[] msg = ("hello " + i + "\n").getBytes(StandardCharsets.UTF_8);
try {
out.write(msg);
out.flush();
bytesWritten.addAndGet(msg.length);
} catch (IOException e) {
e.printStackTrace();
}
if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
try {
Thread.sleep(fakeSlowWriteMs);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
writeFinished.set(true);
});
// Reader
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus writer start.");
}
int bytesRead = 0;
int bytesRequired = 1; // Number of bytes read from file in one go.
int maxWaitTimeMs = 1000;
if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Writer did not open file for reading within 500 ms.");
}
try (InputStream in = Files.newInputStream(testFile)) {
boolean eof = false;
while (!eof) {
if (!writeFinished.get()) {
if (bytesWritten.get() - bytesRead < bytesRequired) {
int sleepTimeTotal = 0;
while (!writeFinished.get()) {
Thread.sleep(1);
if (bytesWritten.get() - bytesRead >= bytesRequired) {
break; // break the waiting loop, read the available bytes.
}
sleepTimeTotal += 1;
if (sleepTimeTotal >= maxWaitTimeMs) {
throw new RuntimeException("No bytes available to read within waiting time.");
}
}
}
}
int b = in.read();
bytesRead += 1;
if (b < 0) {
eof = true;
} else {
System.out.print( (char) b);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
doneLatch.countDown();
});
if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
}
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdownNow();
System.out.println("\nFinished.");
}
}
在我的应用程序中,我正在接收要存储在文件中并对其进行一些计算的数据。接收和计算都可能持续很长时间,所以我想异步进行。
下面的列表显示了我的基本设置:thread1
生成一些数据并将它们存储在一个文件中。
thread2
读取文件并处理数据。
Thread thread1 = new Thread( () -> {
try {
BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
for( int i = 0; i < 10; i++ ) {
//producing data...
out.write( ( "hello " + i + "\n" ).getBytes() );
out.flush();
//Thread.sleep( 10 );
}
out.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread1.start();
Thread thread2 = new Thread( () -> {
try {
BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
int b = in.read();
while( b != -1 ) {
//do some calculation with data
System.out.print( (char)b );
b = in.read();
}
in.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread2.start();
根据这个问题,我想在同一个文件上同时读取和写入是可以的:FileInputStream and FileOutputStream to the same file: Is a read() guaranteed to see all write()s that "happened before"? 或者我在这里遗漏了什么?
执行上面的清单会产生预期的输出:
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
但是如果 reader 线程由于某种原因比编写器快(可以通过取消注释线程 1 中的 Thread.sleep 行来模拟),reader 读取 EOF (-1) 并在文件完全写入之前完成。只放了一行:
hello 0
然而作者仍然在 "test" 文件中生成整个输出。
现在我想让 in.read()
阻塞,直到线程 1 中的 FileOutputStream 关闭。
我认为这可以通过避免将 EOF 放在文件末尾直到 out
关闭来完成。这是真的吗?如果是,我该怎么做?或者有更好的方法吗?
A reader(消费者)可以等待写入者(生产者),即使接口是一个文件。但总的来说,使用队列并遵循 producer/consumer 模式会更好。
无论如何,在这种情况下,粗略的 "wait for more input" 过程只涉及两个 Atomic
值:
- 一个用于跟踪写入的字节数 (
AtomicInteger
) - 一个表示没有更多字节可用(
AtomicBoolean
)
原子变量可以在线程之间共享:两个线程将始终看到原子值的最新值。
然后,作者可以通过 AtomicInteger
更新写入的字节数,然后 reader 可以决定等待更多输入。
编写器还可以通过 AtomicBoolean
指示是否不再写入字节,并且 reader 可以使用该信息读取到文件末尾。
要记住的另一件事是启动线程不受您的控制:您的操作系统将决定线程何时真正启动 运行。 要同时为线程提供 运行 的合理机会,请使用 "startLatch",如下面的代码所示。
下面的演示代码是可运行的,应该可以很好地说明如何使 reader 线程等待来自编写器线程的更多输入。
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class ReadWhileWrite {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
CountDownLatch startLatch = new CountDownLatch(2);
Path testFile = Paths.get("test-read-while-write.txt");
testFile.toFile().delete();
int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes.
CountDownLatch testFileExists = new CountDownLatch(1);
AtomicInteger bytesWritten = new AtomicInteger();
AtomicBoolean writeFinished = new AtomicBoolean();
// Writer
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus reader start.");
}
try (OutputStream out = Files.newOutputStream(testFile)) {
testFileExists.countDown();
int maxLoops = 10;
IntStream.range(0, maxLoops).forEach(i -> {
byte[] msg = ("hello " + i + "\n").getBytes(StandardCharsets.UTF_8);
try {
out.write(msg);
out.flush();
bytesWritten.addAndGet(msg.length);
} catch (IOException e) {
e.printStackTrace();
}
if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
try {
Thread.sleep(fakeSlowWriteMs);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
writeFinished.set(true);
});
// Reader
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus writer start.");
}
int bytesRead = 0;
int bytesRequired = 1; // Number of bytes read from file in one go.
int maxWaitTimeMs = 1000;
if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Writer did not open file for reading within 500 ms.");
}
try (InputStream in = Files.newInputStream(testFile)) {
boolean eof = false;
while (!eof) {
if (!writeFinished.get()) {
if (bytesWritten.get() - bytesRead < bytesRequired) {
int sleepTimeTotal = 0;
while (!writeFinished.get()) {
Thread.sleep(1);
if (bytesWritten.get() - bytesRead >= bytesRequired) {
break; // break the waiting loop, read the available bytes.
}
sleepTimeTotal += 1;
if (sleepTimeTotal >= maxWaitTimeMs) {
throw new RuntimeException("No bytes available to read within waiting time.");
}
}
}
}
int b = in.read();
bytesRead += 1;
if (b < 0) {
eof = true;
} else {
System.out.print( (char) b);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
doneLatch.countDown();
});
if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
}
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdownNow();
System.out.println("\nFinished.");
}
}