更快地读取大文本文件
Reading a large text file faster
我正在尝试以尽可能快的速度读取一个大文本文件。
- 不以“!”开头的行都过去了。
- 具有 8 个 CSV 的行已删除其最后一个值。
- 值中永远不会有“,”(不需要使用 opencsv)。
- 所有内容都添加到稍后解码的长字符串中。
所以这是我的代码
BufferedReader br = new BufferedReader(new FileReader("C:\Users\Documents\ais_messages1.3.txt"));
String line, aisLines="", cvsSplitBy = ",";
try {
while ((line = br.readLine()) != null) {
if(line.charAt(0) == '!') {
String[] cols = line.split(cvsSplitBy);
if(cols.length>=8) {
line = "";
for(int i=0; i<cols.length-1; i++) {
if(i == cols.length-2) {
line = line + cols[i];
} else {
line = line + cols[i] + ",";
}
}
aisLines += line + "\n";
} else {
aisLines += line + "\n";
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
所以现在它在 14 秒内读取了 36890 行。我还尝试了 InputStreamReader:
InputStreamReader isr = new InputStreamReader(new FileInputStream("C:\Users\Documents\ais_messages1.3.txt"));
BufferedReader br = new BufferedReader(isr);
并且花费了相同的时间。有没有更快的方法来读取大型文本文件(100,000 或 1,000,000 行)?
停止尝试将 aisLines
构建为大字符串。使用将行附加到的 ArrayList<String>
。在我的机器上,这需要 0.6% 的时间作为你的方法。 (这段代码在 0.75 秒内处理了 1,000,000 条简单的行。)它会减少以后处理数据所需的工作量,因为它已经按行拆分了。
BufferedReader br = new BufferedReader(new FileReader("data.txt"));
List<String> aisLines = new ArrayList<String>();
String line, cvsSplitBy = ",";
try {
while ((line = br.readLine()) != null) {
if(line.charAt(0) == '!') {
String[] cols = line.split(cvsSplitBy);
if(cols.length>=8) {
line = "";
for(int i=0; i<cols.length-1; i++) {
if(i == cols.length-2) {
line = line + cols[i];
} else {
line = line + cols[i] + ",";
}
}
aisLines.add(line);
} else {
aisLines.add(line);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
如果你真的想要一个大的 String
最后(因为你正在与其他人的代码交互,或者其他什么),将 ArrayList
转换回一个字符串,而不是做你正在做的事情。
您可以使用单线程读取大型 csv 文件并使用多线程解析所有行。我的做法是使用 Producer-Consumer
模式和 BlockingQueue。
制作人
制作一个生产者线程,它只负责读取你的 csv 文件的行,并将行存储到 BlockingQueue 中。 producer 端不做任何其他操作。
消费者
制作多个消费者线程,将相同的 BlockingQueue 对象传递给您的消费者。在您的消费者线程 class.
中实施 耗时的工作
以下代码为您提供解决问题的想法,而不是解决方案。
我是使用 python 实现的,它比使用单个线程完成所有工作要快得多。语言不是java,但背后的理论是一样的
import multiprocessing
import Queue
QUEUE_SIZE = 2000
def produce(file_queue, row_queue,):
while not file_queue.empty():
src_file = file_queue.get()
zip_reader = gzip.open(src_file, 'rb')
try:
csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER)
for row in csv_reader:
new_row = process_sdp_row(row)
if new_row:
row_queue.put(new_row)
finally:
zip_reader.close()
def consume(row_queue):
'''processes all rows, once queue is empty, break the infinit loop'''
while True:
try:
# takes a row from queue and process it
pass
except multiprocessing.TimeoutError as toe:
print "timeout, all rows have been processed, quit."
break
except Queue.Empty:
print "all rows have been processed, quit."
break
except Exception as e:
print "critical error"
print e
break
def main(args):
file_queue = multiprocessing.Queue()
row_queue = multiprocessing.Queue(QUEUE_SIZE)
file_queue.put(file1)
file_queue.put(file2)
file_queue.put(file3)
# starts 3 producers
for i in xrange(4):
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.start()
# starts 1 consumer
consumer = multiprocessing.Process(target=consume,args=(row_queue,))
consumer.start()
# blocks main thread until consumer process finished
consumer.join()
# prints statistics results after consumer is done
sys.exit(0)
if __name__ == "__main__":
main(sys.argv[1:])
由于最消耗的操作是IO,最有效的方法是拆分线程进行解析和读取:
private static void readFast(String filePath) throws IOException, InterruptedException {
ExecutorService executor = Executors.newWorkStealingPool();
BufferedReader br = new BufferedReader(new FileReader(filePath));
List<String> parsed = Collections.synchronizedList(new ArrayList<>());
try {
String line;
while ((line = br.readLine()) != null) {
final String l = line;
executor.submit(() -> {
if (l.charAt(0) == '!') {
parsed.add(parse(l));
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
executor.shutdown();
executor.awaitTermination(1000, TimeUnit.MINUTES);
String result = parsed.stream().collect(Collectors.joining("\n"));
}
我的电脑用了 386 毫秒,慢的电脑用了 10787 毫秒
我正在尝试以尽可能快的速度读取一个大文本文件。
- 不以“!”开头的行都过去了。
- 具有 8 个 CSV 的行已删除其最后一个值。
- 值中永远不会有“,”(不需要使用 opencsv)。
- 所有内容都添加到稍后解码的长字符串中。
所以这是我的代码
BufferedReader br = new BufferedReader(new FileReader("C:\Users\Documents\ais_messages1.3.txt"));
String line, aisLines="", cvsSplitBy = ",";
try {
while ((line = br.readLine()) != null) {
if(line.charAt(0) == '!') {
String[] cols = line.split(cvsSplitBy);
if(cols.length>=8) {
line = "";
for(int i=0; i<cols.length-1; i++) {
if(i == cols.length-2) {
line = line + cols[i];
} else {
line = line + cols[i] + ",";
}
}
aisLines += line + "\n";
} else {
aisLines += line + "\n";
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
所以现在它在 14 秒内读取了 36890 行。我还尝试了 InputStreamReader:
InputStreamReader isr = new InputStreamReader(new FileInputStream("C:\Users\Documents\ais_messages1.3.txt"));
BufferedReader br = new BufferedReader(isr);
并且花费了相同的时间。有没有更快的方法来读取大型文本文件(100,000 或 1,000,000 行)?
停止尝试将 aisLines
构建为大字符串。使用将行附加到的 ArrayList<String>
。在我的机器上,这需要 0.6% 的时间作为你的方法。 (这段代码在 0.75 秒内处理了 1,000,000 条简单的行。)它会减少以后处理数据所需的工作量,因为它已经按行拆分了。
BufferedReader br = new BufferedReader(new FileReader("data.txt"));
List<String> aisLines = new ArrayList<String>();
String line, cvsSplitBy = ",";
try {
while ((line = br.readLine()) != null) {
if(line.charAt(0) == '!') {
String[] cols = line.split(cvsSplitBy);
if(cols.length>=8) {
line = "";
for(int i=0; i<cols.length-1; i++) {
if(i == cols.length-2) {
line = line + cols[i];
} else {
line = line + cols[i] + ",";
}
}
aisLines.add(line);
} else {
aisLines.add(line);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
如果你真的想要一个大的 String
最后(因为你正在与其他人的代码交互,或者其他什么),将 ArrayList
转换回一个字符串,而不是做你正在做的事情。
您可以使用单线程读取大型 csv 文件并使用多线程解析所有行。我的做法是使用 Producer-Consumer
模式和 BlockingQueue。
制作人
制作一个生产者线程,它只负责读取你的 csv 文件的行,并将行存储到 BlockingQueue 中。 producer 端不做任何其他操作。
消费者
制作多个消费者线程,将相同的 BlockingQueue 对象传递给您的消费者。在您的消费者线程 class.
中实施 耗时的工作以下代码为您提供解决问题的想法,而不是解决方案。 我是使用 python 实现的,它比使用单个线程完成所有工作要快得多。语言不是java,但背后的理论是一样的
import multiprocessing
import Queue
QUEUE_SIZE = 2000
def produce(file_queue, row_queue,):
while not file_queue.empty():
src_file = file_queue.get()
zip_reader = gzip.open(src_file, 'rb')
try:
csv_reader = csv.reader(zip_reader, delimiter=SDP_DELIMITER)
for row in csv_reader:
new_row = process_sdp_row(row)
if new_row:
row_queue.put(new_row)
finally:
zip_reader.close()
def consume(row_queue):
'''processes all rows, once queue is empty, break the infinit loop'''
while True:
try:
# takes a row from queue and process it
pass
except multiprocessing.TimeoutError as toe:
print "timeout, all rows have been processed, quit."
break
except Queue.Empty:
print "all rows have been processed, quit."
break
except Exception as e:
print "critical error"
print e
break
def main(args):
file_queue = multiprocessing.Queue()
row_queue = multiprocessing.Queue(QUEUE_SIZE)
file_queue.put(file1)
file_queue.put(file2)
file_queue.put(file3)
# starts 3 producers
for i in xrange(4):
producer = multiprocessing.Process(target=produce,args=(file_queue,row_queue))
producer.start()
# starts 1 consumer
consumer = multiprocessing.Process(target=consume,args=(row_queue,))
consumer.start()
# blocks main thread until consumer process finished
consumer.join()
# prints statistics results after consumer is done
sys.exit(0)
if __name__ == "__main__":
main(sys.argv[1:])
由于最消耗的操作是IO,最有效的方法是拆分线程进行解析和读取:
private static void readFast(String filePath) throws IOException, InterruptedException {
ExecutorService executor = Executors.newWorkStealingPool();
BufferedReader br = new BufferedReader(new FileReader(filePath));
List<String> parsed = Collections.synchronizedList(new ArrayList<>());
try {
String line;
while ((line = br.readLine()) != null) {
final String l = line;
executor.submit(() -> {
if (l.charAt(0) == '!') {
parsed.add(parse(l));
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
executor.shutdown();
executor.awaitTermination(1000, TimeUnit.MINUTES);
String result = parsed.stream().collect(Collectors.joining("\n"));
}
我的电脑用了 386 毫秒,慢的电脑用了 10787 毫秒