在同一文件上的 FileOutputStream 关​​闭之前,我可以使 FileInputStream.read 阻塞吗?

Can I make FileInputStream.read block until FileOutputStream on the same file is closed?

在我的应用程序中,我正在接收要存储在文件中并对其进行一些计算的数据。接收和计算都可能持续很长时间,所以我想异步进行。 下面的列表显示了我的基本设置:thread1 生成一些数据并将它们存储在一个文件中。 thread2 读取文件并处理数据。

    Thread thread1 = new Thread( () -> {
      try {
        BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
        for( int i = 0; i < 10; i++ ) {
          //producing data...
          out.write( ( "hello " + i + "\n" ).getBytes() );
          out.flush();
          //Thread.sleep( 10 );
        }
        out.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread1.start();

    Thread thread2 = new Thread( () -> {
      try {
        BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
        int b = in.read();
        while( b != -1 ) {
          //do some calculation with data
          System.out.print( (char)b );
          b = in.read();
        }
        in.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread2.start();

根据这个问题,我想在同一个文件上同时读取和写入是可以的:FileInputStream and FileOutputStream to the same file: Is a read() guaranteed to see all write()s that "happened before"? 或者我在这里遗漏了什么?

执行上面的清单会产生预期的输出:

hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

但是如果 reader 线程由于某种原因比编写器快(可以通过取消注释线程 1 中的 Thread.sleep 行来模拟),reader 读取 EOF (-1) 并在文件完全写入之前完成。只放了一行:

hello 0

然而作者仍然在 "test" 文件中生成整个输出。

现在我想让 in.read() 阻塞,直到线程 1 中的 FileOutputStream 关​​闭。 我认为这可以通过避免将 EOF 放在文件末尾直到 out 关闭来完成。这是真的吗?如果是,我该怎么做?或者有更好的方法吗?

A reader(消费者)可以等待写入者(生产者),即使接口是一个文件。但总的来说,使用队列并遵循 producer/consumer 模式会更好。

无论如何,在这种情况下,粗略的 "wait for more input" 过程只涉及两个 Atomic 值:

  • 一个用于跟踪写入的字节数 (AtomicInteger)
  • 一个表示没有更多字节可用(AtomicBoolean)

原子变量可以在线程之间共享:两个线程将始终看到原子值的最新值。 然后,作者可以通过 AtomicInteger 更新写入的字节数,然后 reader 可以决定等待更多输入。 编写器还可以通过 AtomicBoolean 指示是否不再写入字节,并且 reader 可以使用该信息读取到文件末尾。

要记住的另一件事是启动线程不受您的控制:您的操作系统将决定线程何时真正启动 运行。 要同时为线程提供 运行 的合理机会,请使用 "startLatch",如下面的代码所示。

下面的演示代码是可运行的,应该可以很好地说明如何使 reader 线程等待来自编写器线程的更多输入。


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class ReadWhileWrite {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            CountDownLatch startLatch = new CountDownLatch(2);
            Path testFile = Paths.get("test-read-while-write.txt");
            testFile.toFile().delete();
            int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes. 

            CountDownLatch testFileExists = new CountDownLatch(1);
            AtomicInteger bytesWritten = new AtomicInteger();
            AtomicBoolean writeFinished = new AtomicBoolean();

            // Writer
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus reader start.");
                    }
                    try (OutputStream out = Files.newOutputStream(testFile)) {
                        testFileExists.countDown();
                        int maxLoops = 10;
                        IntStream.range(0, maxLoops).forEach(i -> {
                            byte[] msg = ("hello " + i + "\n").getBytes(StandardCharsets.UTF_8);
                            try {
                                out.write(msg);
                                out.flush();
                                bytesWritten.addAndGet(msg.length);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
                                try {
                                    Thread.sleep(fakeSlowWriteMs);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writeFinished.set(true);
            });
            // Reader
            CountDownLatch doneLatch = new CountDownLatch(1);
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus writer start.");
                    }
                    int bytesRead = 0;
                    int bytesRequired = 1; // Number of bytes read from file in one go.
                    int maxWaitTimeMs = 1000;
                    if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Writer did not open file for reading within 500 ms.");
                    }
                    try (InputStream in = Files.newInputStream(testFile)) {
                        boolean eof = false;
                        while (!eof) {
                            if (!writeFinished.get()) {
                                if (bytesWritten.get() - bytesRead < bytesRequired) {
                                    int sleepTimeTotal = 0;
                                    while (!writeFinished.get()) {
                                        Thread.sleep(1);
                                        if (bytesWritten.get() - bytesRead >= bytesRequired) {
                                            break; // break the waiting loop, read the available bytes.
                                        }
                                        sleepTimeTotal += 1;
                                        if (sleepTimeTotal >= maxWaitTimeMs) {
                                            throw new RuntimeException("No bytes available to read within waiting time.");
                                        }
                                    }
                                }
                            }
                            int b = in.read();
                            bytesRead += 1;
                            if (b < 0) {
                                eof = true;
                            } else {
                                System.out.print( (char) b);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                doneLatch.countDown();
            });
            if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        executor.shutdownNow();
        System.out.println("\nFinished.");
    }
}