写入大量文件的最佳方式
Best way to write huge number of files
我正在写很多像下面这样的文件。
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
throws Exception {
// TODO Auto-generated method stub
while (arg0.hasNext()) {
Tuple2<Text, BytesWritable> tuple2 = arg0.next();
System.out.println(tuple2._1().toString());
PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
writer.println(new String(tuple2._2().getBytes()));
writer.close();
}
}
有没有更好的方法来写入文件..而无需每次都关闭或创建 printwriter。
没有比写入大量文件更好的方法了。你正在做的事情本质上是 I/O 密集的。
更新 - 我认为@Michael Anderson 是对的。使用多个线程来写入文件(可能)会大大加快速度。然而,I/O 从几个方面来看仍将是最终瓶颈:
创建、打开和关闭文件涉及文件和目录元数据访问和更新。这需要不平凡的 CPU.
需要将文件数据和元数据更改写入光盘。那可能是多盘写入。
每个写入的文件至少有 3 个系统调用。
还有线缝的开销
除非写入每个文件的数据量很大(每个文件几千字节),否则我怀疑使用 NIO、直接缓冲区、JNI 等技术是否值得。真正的瓶颈将在内核中:文件系统操作和低级磁盘I/O。
... without closing or creating printwriter every time.
没有。您需要为每个文件创建一个新的 PrintWriter
(或 Writer
或 OutputStream
)。
然而,这...
writer.println(new String(tuple2._2().getBytes()));
...看起来很奇怪。您似乎是:
- 在
String
上调用 getBytes()
(?),
- 将字节数组转换为
String
- 调用
String
上的 println()
方法,它将复制它,并在最终输出之前将其转换回字节。
什么给了? String -> bytes -> String 转换有什么意义?
我会这样做:
writer.println(tuple2._2());
这应该会更快,但我不希望百分比加速有那么大。
我假设您正在寻找最快的方法。因为每个人都知道最快的就是最好的 ;)
一个简单的方法是使用一堆线程为您写作。
但是,除非您的文件系统可以很好地扩展,否则您不会通过这样做获得太多好处。 (我在基于 Lustre 的集群系统上使用这种技术,在 "lots of files" 可能意味着 10k 的情况下 - 在这种情况下,许多写入将转到不同的服务器/磁盘)
代码看起来像这样:(请注意,我认为这个版本不正确,因为少量文件会填满工作队列 - 但无论如何请查看下一个版本以获得更好的版本...)
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
int nThreads=5;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
int nJobs = 0;
while (arg0.hasNext()) {
++nJobs;
final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
ecs.submit(new Callable<Void>() {
@Override Void call() {
System.out.println(tuple2._1().toString());
String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(tuple2._2().getBytes()))
}
return null;
}
});
}
for(int i=0; i<nJobs; ++i) {
ecs.take().get();
}
}
更好的方法是在您拥有第一个文件的数据后立即开始写入您的文件,而不是在您拥有所有文件的数据时开始写入文件 - 并且此写入不会阻塞计算线程。
为此,您将应用程序分成几个部分,通过(线程安全的)队列进行通信。
代码最终看起来更像这样:
public void main() {
SomeMultithreadedQueue<Data> queue = ...;
int nGeneratorThreads=1;
int nWriterThreads=5;
int nThreads = nGeneratorThreads + nWriterThreads;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
AtomicInteger completedGenerators = new AtomicInteger(0);
// Start some generator threads.
for(int i=0; ++i; i<nGeneratorThreads) {
ecs.submit( () -> {
while(...) {
Data d = ... ;
queue.push(d);
}
if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
queue.push(null);
}
return null;
});
}
// Start some writer threads
for(int i=0; i<nWriterThreads; ++i) {
ecs.submit( () -> {
Data d
while((d = queue.take())!=null) {
String path = data.path();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(data.getBytes()));
}
return null;
}
});
}
for(int i=0; i<nThreads; ++i) {
ecs.take().get();
}
}
请注意,我没有提供队列的实现 class 您可以轻松包装标准 java 线程安全队列以获得您需要的。
在减少延迟等方面还有很多工作要做 - 这是我用来缩短时间的一些其他方法...
甚至不必等待为给定文件生成所有数据。传递另一个包含要写入的字节数据包的队列。
注意分配 - 您可以重复使用一些缓冲区。
nio 中存在一些延迟 - 您可以通过使用 C 写入、JNI 和直接缓冲区来提高性能。
线程切换可能会造成伤害,队列中的延迟也会造成伤害,因此您可能希望稍微批量处理数据。用 1 来平衡这个可能很棘手。
我正在写很多像下面这样的文件。
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
throws Exception {
// TODO Auto-generated method stub
while (arg0.hasNext()) {
Tuple2<Text, BytesWritable> tuple2 = arg0.next();
System.out.println(tuple2._1().toString());
PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
writer.println(new String(tuple2._2().getBytes()));
writer.close();
}
}
有没有更好的方法来写入文件..而无需每次都关闭或创建 printwriter。
没有比写入大量文件更好的方法了。你正在做的事情本质上是 I/O 密集的。
更新 - 我认为@Michael Anderson 是对的。使用多个线程来写入文件(可能)会大大加快速度。然而,I/O 从几个方面来看仍将是最终瓶颈:
创建、打开和关闭文件涉及文件和目录元数据访问和更新。这需要不平凡的 CPU.
需要将文件数据和元数据更改写入光盘。那可能是多盘写入。
每个写入的文件至少有 3 个系统调用。
还有线缝的开销
除非写入每个文件的数据量很大(每个文件几千字节),否则我怀疑使用 NIO、直接缓冲区、JNI 等技术是否值得。真正的瓶颈将在内核中:文件系统操作和低级磁盘I/O。
... without closing or creating printwriter every time.
没有。您需要为每个文件创建一个新的 PrintWriter
(或 Writer
或 OutputStream
)。
然而,这...
writer.println(new String(tuple2._2().getBytes()));
...看起来很奇怪。您似乎是:
- 在
String
上调用getBytes()
(?), - 将字节数组转换为
String
- 调用
String
上的println()
方法,它将复制它,并在最终输出之前将其转换回字节。
什么给了? String -> bytes -> String 转换有什么意义?
我会这样做:
writer.println(tuple2._2());
这应该会更快,但我不希望百分比加速有那么大。
我假设您正在寻找最快的方法。因为每个人都知道最快的就是最好的 ;)
一个简单的方法是使用一堆线程为您写作。 但是,除非您的文件系统可以很好地扩展,否则您不会通过这样做获得太多好处。 (我在基于 Lustre 的集群系统上使用这种技术,在 "lots of files" 可能意味着 10k 的情况下 - 在这种情况下,许多写入将转到不同的服务器/磁盘)
代码看起来像这样:(请注意,我认为这个版本不正确,因为少量文件会填满工作队列 - 但无论如何请查看下一个版本以获得更好的版本...)
public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
int nThreads=5;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
int nJobs = 0;
while (arg0.hasNext()) {
++nJobs;
final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
ecs.submit(new Callable<Void>() {
@Override Void call() {
System.out.println(tuple2._1().toString());
String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(tuple2._2().getBytes()))
}
return null;
}
});
}
for(int i=0; i<nJobs; ++i) {
ecs.take().get();
}
}
更好的方法是在您拥有第一个文件的数据后立即开始写入您的文件,而不是在您拥有所有文件的数据时开始写入文件 - 并且此写入不会阻塞计算线程。
为此,您将应用程序分成几个部分,通过(线程安全的)队列进行通信。
代码最终看起来更像这样:
public void main() {
SomeMultithreadedQueue<Data> queue = ...;
int nGeneratorThreads=1;
int nWriterThreads=5;
int nThreads = nGeneratorThreads + nWriterThreads;
ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
AtomicInteger completedGenerators = new AtomicInteger(0);
// Start some generator threads.
for(int i=0; ++i; i<nGeneratorThreads) {
ecs.submit( () -> {
while(...) {
Data d = ... ;
queue.push(d);
}
if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
queue.push(null);
}
return null;
});
}
// Start some writer threads
for(int i=0; i<nWriterThreads; ++i) {
ecs.submit( () -> {
Data d
while((d = queue.take())!=null) {
String path = data.path();
try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
writer.println(new String(data.getBytes()));
}
return null;
}
});
}
for(int i=0; i<nThreads; ++i) {
ecs.take().get();
}
}
请注意,我没有提供队列的实现 class 您可以轻松包装标准 java 线程安全队列以获得您需要的。
在减少延迟等方面还有很多工作要做 - 这是我用来缩短时间的一些其他方法...
甚至不必等待为给定文件生成所有数据。传递另一个包含要写入的字节数据包的队列。
注意分配 - 您可以重复使用一些缓冲区。
nio 中存在一些延迟 - 您可以通过使用 C 写入、JNI 和直接缓冲区来提高性能。
线程切换可能会造成伤害,队列中的延迟也会造成伤害,因此您可能希望稍微批量处理数据。用 1 来平衡这个可能很棘手。