在两个单独的线程中读取和处理文件的速度比一个线程慢两倍
Reading and processing a file in two separate threads works twice slower than one thread
我解决了计算文本文件中唯一行数的任务。每个字符串都是一个有效的 ip 地址。该文件可以是任何大小(从字面上看,可能有数百和数千 GB)。我写了一个简单的 class 实现了一个位数组并用它来计数。
public class IntArrayBitCounter {
public static final long MIN_BIT_CAPACITY = 1L;
public static final long MAX_BIT_CAPACITY = 1L << 32;
private final int intArraySize;
private final int[] intArray;
private long counter;
public IntArrayBitCounter(long bitCapacity) {
if (bitCapacity < MIN_BIT_CAPACITY || bitCapacity > MAX_BIT_CAPACITY) {
throw new IllegalArgumentException("Capacity must be in range [1.." + MAX_BIT_CAPACITY + "].");
}
this.intArraySize = 1 + (int) ((bitCapacity - 1) >> 5);
this.intArray = new int[intArraySize];
}
private void checkBounds(long bitIndex) {
if (bitIndex < 0 || bitIndex > ((long) intArraySize << 5)) {
throw new IndexOutOfBoundsException("Bit index must be in range [0.." + (MAX_BIT_CAPACITY - 1) + "].");
}
}
public void setBit(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
if ((intArray[index] & bit) == 0) {
counter++;
intArray[index] |= bit;
}
}
public boolean isBitSets(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
return (intArray[index] & bit) != 0;
}
public int getIntArraySize() {
return intArraySize;
}
public long getBitCapacity() {
return (long) intArraySize << 5;
}
public long getCounter() {
return counter;
}
}
我的简单单线程方法效果很好。它几乎完全利用了我旧硬盘的读取速度,大约为 130-135 MB/s。 Linux 中的系统监视器显示从磁盘读取到我的程序大约 100-110 MB/s。
public class IpCounterApp {
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var counter = new IntArrayBitCounter(1L << 32);
long linesProcessed = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
counter.setBit(toLongValue(line));
linesProcessed++;
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.printf("%d unique lines in %d processed\n", counter.getCounter(), linesProcessed);
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
然后我尝试开始从磁盘读取并在两个不同的线程中处理行,希望能有所改进。我创建了一个阻塞队列。第一个线程读取行并写入此队列。第二个线程从队列中读出并进行计数。
但是,10_000_000 个地址中 5_000_000 个测试文件的执行速度几乎崩溃了近 2 倍。读取速度也下降了一半,为50-55 MB/s。
public class ConcurrentIpCounterApp {
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var stringsQueue = new ArrayBlockingQueue<String>(1024);
var reader = new BlockingQueueFileReader(stringsQueue, fileName);
var counter = new BlockingQueueCounter(stringsQueue);
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Long> linesProcessed = executorService.submit(reader);
Future<Long> uniqueLines = executorService.submit(counter);
try {
System.out.printf("%d unique lines in %d processed\n", uniqueLines.get(), linesProcessed.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
public class BlockingQueueCounter implements Callable<Long> {
private final BlockingQueue<String> queue;
private final IntArrayBitCounter counter;
public BlockingQueueCounter(BlockingQueue<String> queue) {
this.queue = queue;
this.counter = new IntArrayBitCounter(1L << 32);
}
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
@Override
public Long call() {
String line;
while (true) {
try {
line = queue.take();
if ("EOF".equals(line)) {
break;
}
counter.setBit(toLongValue(line));
} catch (InterruptedException | UnknownHostException e) {
e.printStackTrace();
}
}
return counter.getCounter();
}
}
public class BlockingQueueFileReader implements Callable<Long> {
private final BlockingQueue<String> queue;
private final String fileName;
private long totalLines;
public BlockingQueueFileReader(BlockingQueue<String> queue, String fileName) {
this.queue = queue;
this.fileName = fileName;
}
@Override
public Long call() {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line);
totalLines++;
}
queue.add("EOF");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return totalLines;
}
}
请帮我理解为什么会这样。我自己找不到答案。
有没有可能阻塞队列在一大块数据入队后不仅阻塞了消费者,还阻塞了发送者?在这种情况下你的读取线程不得不暂停,也许启动下一次读取操作意味着等待硬盘驱动器完成下一次旋转。
如果增加阻塞队列的大小,性能如何?
所以您必须确保 reader 永远不会暂停。如果队列增长太大,增加消费线程的数量。
为了回答为什么多线程尝试比单线程慢两倍的问题,尝试测量
- 整个过程的耗时(您已经这样做了)
- 生产者活动时间(从磁盘读取数据并为队列格式化数据)
- 生产者队列等待时间(实际将数据塞入最终阻塞的队列的时间)
我想这就是你得到答案的地方。
我解决了计算文本文件中唯一行数的任务。每个字符串都是一个有效的 ip 地址。该文件可以是任何大小(从字面上看,可能有数百和数千 GB)。我写了一个简单的 class 实现了一个位数组并用它来计数。
public class IntArrayBitCounter {
public static final long MIN_BIT_CAPACITY = 1L;
public static final long MAX_BIT_CAPACITY = 1L << 32;
private final int intArraySize;
private final int[] intArray;
private long counter;
public IntArrayBitCounter(long bitCapacity) {
if (bitCapacity < MIN_BIT_CAPACITY || bitCapacity > MAX_BIT_CAPACITY) {
throw new IllegalArgumentException("Capacity must be in range [1.." + MAX_BIT_CAPACITY + "].");
}
this.intArraySize = 1 + (int) ((bitCapacity - 1) >> 5);
this.intArray = new int[intArraySize];
}
private void checkBounds(long bitIndex) {
if (bitIndex < 0 || bitIndex > ((long) intArraySize << 5)) {
throw new IndexOutOfBoundsException("Bit index must be in range [0.." + (MAX_BIT_CAPACITY - 1) + "].");
}
}
public void setBit(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
if ((intArray[index] & bit) == 0) {
counter++;
intArray[index] |= bit;
}
}
public boolean isBitSets(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
return (intArray[index] & bit) != 0;
}
public int getIntArraySize() {
return intArraySize;
}
public long getBitCapacity() {
return (long) intArraySize << 5;
}
public long getCounter() {
return counter;
}
}
我的简单单线程方法效果很好。它几乎完全利用了我旧硬盘的读取速度,大约为 130-135 MB/s。 Linux 中的系统监视器显示从磁盘读取到我的程序大约 100-110 MB/s。
public class IpCounterApp {
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var counter = new IntArrayBitCounter(1L << 32);
long linesProcessed = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
counter.setBit(toLongValue(line));
linesProcessed++;
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.printf("%d unique lines in %d processed\n", counter.getCounter(), linesProcessed);
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
然后我尝试开始从磁盘读取并在两个不同的线程中处理行,希望能有所改进。我创建了一个阻塞队列。第一个线程读取行并写入此队列。第二个线程从队列中读出并进行计数。 但是,10_000_000 个地址中 5_000_000 个测试文件的执行速度几乎崩溃了近 2 倍。读取速度也下降了一半,为50-55 MB/s。
public class ConcurrentIpCounterApp {
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var stringsQueue = new ArrayBlockingQueue<String>(1024);
var reader = new BlockingQueueFileReader(stringsQueue, fileName);
var counter = new BlockingQueueCounter(stringsQueue);
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Long> linesProcessed = executorService.submit(reader);
Future<Long> uniqueLines = executorService.submit(counter);
try {
System.out.printf("%d unique lines in %d processed\n", uniqueLines.get(), linesProcessed.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
public class BlockingQueueCounter implements Callable<Long> {
private final BlockingQueue<String> queue;
private final IntArrayBitCounter counter;
public BlockingQueueCounter(BlockingQueue<String> queue) {
this.queue = queue;
this.counter = new IntArrayBitCounter(1L << 32);
}
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
@Override
public Long call() {
String line;
while (true) {
try {
line = queue.take();
if ("EOF".equals(line)) {
break;
}
counter.setBit(toLongValue(line));
} catch (InterruptedException | UnknownHostException e) {
e.printStackTrace();
}
}
return counter.getCounter();
}
}
public class BlockingQueueFileReader implements Callable<Long> {
private final BlockingQueue<String> queue;
private final String fileName;
private long totalLines;
public BlockingQueueFileReader(BlockingQueue<String> queue, String fileName) {
this.queue = queue;
this.fileName = fileName;
}
@Override
public Long call() {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line);
totalLines++;
}
queue.add("EOF");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return totalLines;
}
}
请帮我理解为什么会这样。我自己找不到答案。
有没有可能阻塞队列在一大块数据入队后不仅阻塞了消费者,还阻塞了发送者?在这种情况下你的读取线程不得不暂停,也许启动下一次读取操作意味着等待硬盘驱动器完成下一次旋转。
如果增加阻塞队列的大小,性能如何?
所以您必须确保 reader 永远不会暂停。如果队列增长太大,增加消费线程的数量。
为了回答为什么多线程尝试比单线程慢两倍的问题,尝试测量
- 整个过程的耗时(您已经这样做了)
- 生产者活动时间(从磁盘读取数据并为队列格式化数据)
- 生产者队列等待时间(实际将数据塞入最终阻塞的队列的时间)
我想这就是你得到答案的地方。