写入大量文件的最佳方式

Best way to write huge number of files

我正在写很多像下面这样的文件。

public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
        throws Exception {
    // TODO Auto-generated method stub

    while (arg0.hasNext()) {
        Tuple2<Text, BytesWritable> tuple2 = arg0.next();
        System.out.println(tuple2._1().toString());
        PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
        writer.println(new String(tuple2._2().getBytes()));
        writer.close();
    }
}

有没有更好的方法来写入文件..而无需每次都关闭或创建 printwriter。

没有比写入大量文件更好的方法了。你正在做的事情本质上是 I/O 密集的。

更新 - 我认为@Michael Anderson 是对的。使用多个线程来写入文件(可能)会大大加快速度。然而,I/O 从几个方面来看仍将是最终瓶颈:

  • 创建、打开和关闭文件涉及文件和目录元数据访问和更新。这需要不平凡的 CPU.

  • 需要将文件数据和元数据更改写入光盘。那可能是多盘写入。

  • 每个写入的文件至少有 3 个系统调用。

  • 还有线缝的开销

除非写入每个文件的数据量很大(每个文件几千字节),否则我怀疑使用 NIO、直接缓冲区、JNI 等技术是否值得。真正的瓶颈将在内核中:文件系统操作和低级磁盘I/O。


... without closing or creating printwriter every time.

没有。您需要为每个文件创建一个新的 PrintWriter(或 WriterOutputStream)。

然而,这...

  writer.println(new String(tuple2._2().getBytes()));

...看起来很奇怪。您似乎是:

  • String 上调用 getBytes() (?),
  • 将字节数组转换为 String
  • 调用 String 上的 println() 方法,它将复制它,并在最终输出之前将其转换回字节。

什么给了? String -> bytes -> String 转换有什么意义?

我会这样做:

  writer.println(tuple2._2());

这应该会更快,但我不希望百分比加速有那么大。

我假设您正在寻找最快的方法。因为每个人都知道最快的就是最好的 ;)

一个简单的方法是使用一堆线程为您写作。 但是,除非您的文件系统可以很好地扩展,否则您不会通过这样做获得太多好处。 (我在基于 Lustre 的集群系统上使用这种技术,在 "lots of files" 可能意味着 10k 的情况下 - 在这种情况下,许多写入将转到不同的服务器/磁盘)

代码看起来像这样:(请注意,我认为这个版本不正确,因为少量文件会填满工作队列 - 但无论如何请查看下一个版本以获得更好的版本...)

public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
    int nThreads=5;
    ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
    ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);

    int nJobs = 0;

    while (arg0.hasNext()) {
        ++nJobs;
        final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
        ecs.submit(new Callable<Void>() {
          @Override Void call() {
             System.out.println(tuple2._1().toString());
             String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
             try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
               writer.println(new String(tuple2._2().getBytes()))
             }
             return null;
          }
       });
    }
    for(int i=0; i<nJobs; ++i) {
       ecs.take().get();
    }
}

更好的方法是在您拥有第一个文件的数据后立即开始写入您的文件,而不是在您拥有所有文件的数据时开始写入文件 - 并且此写入不会阻塞计算线程。

为此,您将应用程序分成几个部分,通过(线程安全的)队列进行通信。

代码最终看起来更像这样:

public void main() {
  SomeMultithreadedQueue<Data> queue = ...;

  int nGeneratorThreads=1;
  int nWriterThreads=5;
  int nThreads = nGeneratorThreads + nWriterThreads;

  ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
  ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);

  AtomicInteger completedGenerators = new AtomicInteger(0);

  // Start some generator threads.
  for(int i=0; ++i; i<nGeneratorThreads) {
    ecs.submit( () -> { 
      while(...) { 
        Data d = ... ;
        queue.push(d);
      }
      if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
        queue.push(null);
      }
      return null;
   });
  }

  // Start some writer threads
  for(int i=0; i<nWriterThreads; ++i) {
    ecs.submit( () -> { 
      Data d
      while((d = queue.take())!=null) {
        String path = data.path();
        try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
           writer.println(new String(data.getBytes()));
        }
        return null;
      }
    });
  }

  for(int i=0; i<nThreads; ++i) {
    ecs.take().get();
  }
}

请注意,我没有提供队列的实现 class 您可以轻松包装标准 java 线程安全队列以获得您需要的。

在减少延迟等方面还有很多工作要做 - 这是我用来缩短时间的一些其他方法...

  1. 甚至不必等待为给定文件生成所有数据。传递另一个包含要写入的字节数据包的队列。

  2. 注意分配 - 您可以重复使用一些缓冲区。

  3. nio 中存在一些延迟 - 您可以通过使用 C 写入、JNI 和直接缓冲区来提高性能。

  4. 线程切换可能会造成伤害,队列中的延迟也会造成伤害,因此您可能希望稍微批量处理数据。用 1 来平衡这个可能很棘手。