在两个单独的线程中读取和处理文件的速度比一个线程慢两倍

Reading and processing a file in two separate threads works twice slower than one thread

我解决了计算文本文件中唯一行数的任务。每个字符串都是一个有效的 ip 地址。该文件可以是任何大小(从字面上看,可能有数百和数千 GB)。我写了一个简单的 class 实现了一个位数组并用它来计数。

public class IntArrayBitCounter {
    public static final long MIN_BIT_CAPACITY = 1L;
    public static final long MAX_BIT_CAPACITY = 1L << 32;

    private final int intArraySize;
    private final int[] intArray;
    private long counter;

    public IntArrayBitCounter(long bitCapacity) {
        if (bitCapacity < MIN_BIT_CAPACITY || bitCapacity > MAX_BIT_CAPACITY) {
            throw new IllegalArgumentException("Capacity must be in range [1.." + MAX_BIT_CAPACITY + "].");
        }
        this.intArraySize = 1 + (int) ((bitCapacity - 1) >> 5);
        this.intArray = new int[intArraySize];
    }

    private void checkBounds(long bitIndex) {
        if (bitIndex < 0 || bitIndex > ((long) intArraySize << 5)) {
            throw new IndexOutOfBoundsException("Bit index must be in range [0.." + (MAX_BIT_CAPACITY - 1) + "].");
        }
    }

    public void setBit(long bitIndex) {
        checkBounds(bitIndex);
        int index = (int) (bitIndex >> 5);
        int bit = 1 << (bitIndex & 31);
        if ((intArray[index] & bit) == 0) {
            counter++;
            intArray[index] |= bit;
        }
    }

    public boolean isBitSets(long bitIndex) {
        checkBounds(bitIndex);
        int index = (int) (bitIndex >> 5);
        int bit = 1 << (bitIndex & 31);
        return (intArray[index] & bit) != 0;
    }

    public int getIntArraySize() {
        return intArraySize;
    }

    public long getBitCapacity() {
        return (long) intArraySize << 5;
    }

    public long getCounter() {
        return counter;
    }
}

我的简单单线程方法效果很好。它几乎完全利用了我旧硬盘的读取速度,大约为 130-135 MB/s。 Linux 中的系统监视器显示从磁盘读取到我的程序大约 100-110 MB/s。

public class IpCounterApp {

    private static long toLongValue(String ipString) throws UnknownHostException {
        long result = 0;
        for (byte b : InetAddress.getByName(ipString).getAddress())
            result = (result << 8) | (b & 255);
        return result;
    }

    public static void main(String[] args) {
        long startTime = System.nanoTime();

        String fileName = "src/test/resources/test.txt";
        var counter = new IntArrayBitCounter(1L << 32);
        long linesProcessed = 0;
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
            String line;
            while ((line = reader.readLine()) != null) {
                counter.setBit(toLongValue(line));
                linesProcessed++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.printf("%d unique lines in %d processed\n", counter.getCounter(), linesProcessed);
        long elapsedTime = System.nanoTime() - startTime;
        System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
    }
}

然后我尝试开始从磁盘读取并在两个不同的线程中处理行,希望能有所改进。我创建了一个阻塞队列。第一个线程读取行并写入此队列。第二个线程从队列中读出并进行计数。 但是,10_000_000 个地址中 5_000_000 个测试文件的执行速度几乎崩溃了近 2 倍。读取速度也下降了一半,为50-55 MB/s。

public class ConcurrentIpCounterApp {

    public static void main(String[] args) {
        long startTime = System.nanoTime();

        String fileName = "src/test/resources/test.txt";
        var stringsQueue = new ArrayBlockingQueue<String>(1024);
        var reader = new BlockingQueueFileReader(stringsQueue, fileName);
        var counter = new BlockingQueueCounter(stringsQueue);

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Long> linesProcessed = executorService.submit(reader);
        Future<Long> uniqueLines = executorService.submit(counter);

        try {
            System.out.printf("%d unique lines in %d processed\n", uniqueLines.get(), linesProcessed.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }

        long elapsedTime = System.nanoTime() - startTime;
        System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
    }
}
public class BlockingQueueCounter implements Callable<Long> {

    private final BlockingQueue<String> queue;
    private final IntArrayBitCounter counter;

    public BlockingQueueCounter(BlockingQueue<String> queue) {
        this.queue = queue;
        this.counter = new IntArrayBitCounter(1L << 32);
    }

    private static long toLongValue(String ipString) throws UnknownHostException {
        long result = 0;
        for (byte b : InetAddress.getByName(ipString).getAddress())
            result = (result << 8) | (b & 255);
        return result;
    }
    
    @Override
    public Long call() {
        String line;
        while (true) {
            try {
                line = queue.take();
                if ("EOF".equals(line)) {
                    break;
                }
                counter.setBit(toLongValue(line));
            } catch (InterruptedException | UnknownHostException e) {
                e.printStackTrace();
            }
        }
        return counter.getCounter();
    }
}
public class BlockingQueueFileReader implements Callable<Long> {

    private final BlockingQueue<String> queue;
    private final String fileName;
    private long totalLines;

    public BlockingQueueFileReader(BlockingQueue<String> queue, String fileName) {
        this.queue = queue;
        this.fileName = fileName;
    }

    @Override
    public Long call() {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line);
                totalLines++;
            }
            queue.add("EOF");
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
        return totalLines;
    }
}

请帮我理解为什么会这样。我自己找不到答案。

有没有可能阻塞队列在一大块数据入队后不仅阻塞了消费者,还阻塞了发送者?在这种情况下你的读取线程不得不暂停,也许启动下一次读取操作意味着等待硬盘驱动器完成下一次旋转。

如果增加阻塞队列的大小,性能如何?

所以您必须确保 reader 永远不会暂停。如果队列增长太大,增加消费线程的数量。

为了回答为什么多线程尝试比单线程慢两倍的问题,尝试测量

  • 整个过程的耗时(您已经这样做了)
  • 生产者活动时间(从磁盘读取数据并为队列格式化数据)
  • 生产者队列等待时间(实际将数据塞入最终阻塞的队列的时间)

我想这就是你得到答案的地方。