Java线程池复杂约束数据处理

Java thread pooled complex constraints data processing

我有 Java EE(桌面)应用程序,它必须处理由多个来源(最多 200 个不同来源)生成的数据文件。每个来源定期生成 具有唯一名称 的数据文件,其中还包含该来源的唯一 ID。

我需要创建一个包含 15 个线程的线程池,用于处理和删除具有这些约束的文件:

  1. 多线程无法同时处理来自同一源的文件。
  2. 来自同一来源的多个文件应按照其创建时间戳的顺序进行处理。
  3. 不可能与文件生成器源同步,因此这意味着下一个文件可能由源生成,同时它的前一个文件正在处理或计划处理。
  4. 由于性能原因应该多线程处理(单线程处理不够,所以我打算使用 10-15 个线程)。
  5. 一次文件处理操作可能耗时3-15秒。

欢迎就池中线程的这种复杂同步的体系结构提出任何建议。

P.S。由于同时处理的限制,我之前在更简单的情况下使用的设计,即使用 ArrayBlockingQueue 不适合这种情况。

总体思路:

每个源都有一个任务队列。

你有一个中央队列,它实际上是一个由所有工作线程共享的任务队列队列。

为每个来源创建一个任务队列。并且您将这些任务队列放在基于唯一 ID 的哈希表中。通过这种方式,您可以保证按顺序处理来自同一来源的任务(要求 2)。

如果收到任务,则在哈希表中查找(或创建)任务队列并将任务添加到任务队列。如果它是第一个添加到队列中的任务,您也将其添加到中央队列中。

然后有一堆工作线程从这个中央队列中获取任务队列,然后从他们刚刚获取的任务队列中获取单个任务并处理该任务。完成任务后,他们需要决定是否需要将任务队列重新插入中央队列。

有几个部分很容易出错:

  1. 您不希望任务队列被多次插入到中央队列中。那将违反您的第一个要求。

  2. 您不希望任务队列不被重新插入中央队列,即使任务可用也是如此。

因此您需要注意适当的同步,这可能比您最初想象的要复杂一些。但是看到任务很长 运行 的事实,我会从一个常规的互斥体开始(可能是每个任务队列),例如synchronized 或锁,不用担心让它成为非阻塞。

所以这是我为解决我的问题而创建的 class 的 skeleton/tester。有关这里发生的事情的一些详细信息,请参阅@pveentjer 的回答。

package org.zur.test;

import java.io.File;
import java.util.Comparator;
import java.util.HashMap;
import java.util.PriorityQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class PmFileProcessor {
    private static final int THREADS_COUNT = 15;

    ArrayBlockingQueue<File> scheduledFiles = new ArrayBlockingQueue<>(10000, false);
    HashMap<String, PmFileProcessingJob> allJobs = new HashMap<>();

    ScheduledExecutorService jobsExecutorService = Executors.newScheduledThreadPool(THREADS_COUNT);

    public PmFileProcessor() {
        super();

        SourceJobsManager fc = new SourceJobsManager();
        fc.setDaemon(true);
        fc.start();
    }

    public void scheduleFile(File f) {
        try {
            scheduledFiles.add(f);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    /**
     * Assigns files to file source processing job.
     *
     * @author
     *         <ul>
     *         <li>Zur13</li>
     *         </ul>
     *
     */
    public class SourceJobsManager extends Thread {

        @Override
        public void run() {
            // assigns scheduled files to per-source jobs and schedules job for additional execution
            while ( true ) {
                try {
                    File f = scheduledFiles.take();
                    PmFileProcessingJob job = getSourceJob(f);
                    job.scheduleSourceFile(f);
                    jobsExecutorService.execute(job); // schedules job execution

                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                // TODO: check disk space periodically
            }

        }

        /**
         * Finds existing job for file source or creates a new one.
         * 
         * @param f
         * @return
         */
        private PmFileProcessingJob getSourceJob(File f) {
            // TODO: test code
            String fname = f.getName();
            String[] parts = fname.split("_");
            String uid = parts[0];
            PmFileProcessingJob res = allJobs.get(uid);
            if ( res == null ) {
                res = new PmFileProcessingJob(uid);
                allJobs.put(uid, res);
            }
            return res;
        }

    }

    /**
     * Process first file from scheduledSourceFiles queue (i.e. each job execution processes a single file or
     * reschedules itself for later execution if another thread already processes the file from the same source).
     *
     * @author
     *         <ul>
     *         <li>Zur13</li>
     *         </ul>
     *
     */
    public class PmFileProcessingJob implements Runnable {
        public final String fileSourceUidString;

        PriorityQueue<File> scheduledSourceFiles = new PriorityQueue<>(1000, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                // TODO Auto-generated method stub
                return 0;
            }
        });

        Semaphore onePassSemaphore = new Semaphore(1);

        public PmFileProcessingJob(String fileSourceUid) {
            super();
            this.fileSourceUidString = fileSourceUid;
        }

        /**
         * Schedules file from for processing by this job.
         */
        public void scheduleSourceFile(File f) {
            scheduledSourceFiles.add(f);
        }

        @Override
        public void run() {
            File f = null;
            if ( scheduledSourceFiles.size() > 0 ) { // fail fast optimization 1
                if ( onePassSemaphore.tryAcquire() ) { // fail fast optimization 2
                    try {
                        f = scheduledSourceFiles.poll();
                        if ( f != null ) {
                            // TODO: process the file
                            try {
                                System.err.println(f.getName() + "\t" + Thread.currentThread().getId());
                                Thread.sleep(1000);
                                return;
                            } catch (Exception e) {
                                // TODO: handle exception
                                return; // prevents reschedule loop for failing files
                            }
                        } else {
                            // scheduledSourceFiles queue is empty
                            return;
                        }
                    } finally {
                        onePassSemaphore.release();
                    }
                }
                if ( f == null && scheduledSourceFiles.size() > 0 ) {
                    // this thread did not process the scheduled file because another thread holds the critical section
                    // pass
                    // this thread should reschedule this Job to release this thread and try to process this job later
                    // with another thread
                    // reschedule the job with 4 seconds delay to prevent excess CPU usage
                    // System.err.println("RESCHEDULE");
                    jobsExecutorService.schedule(this, 3, TimeUnit.SECONDS);
                }
            }
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((this.fileSourceUidString == null) ? 0 : this.fileSourceUidString.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if ( this == obj )
                return true;
            if ( !(obj instanceof PmFileProcessingJob) )
                return false;
            PmFileProcessingJob other = (PmFileProcessingJob) obj;
            if ( this.fileSourceUidString == null ) {
                if ( other.fileSourceUidString != null )
                    return false;
            } else if ( !this.fileSourceUidString.equals(other.fileSourceUidString) )
                return false;
            return true;
        }

    }

    public static void main(String[] args) {
        PmFileProcessor fp = new PmFileProcessor();
        fp.unitTest();
    }

    private void unitTest() {
        // TODO Auto-generated method stub
        int filesCount = 1000;
        for (int i = 0; i < filesCount; i++) {
            int sourceUid = ThreadLocalRandom.current().nextInt(1, 30);
            File f = new File(sourceUid + "_" + i);
            scheduleFile(f);
        }
        Thread.yield();
        try {
            Thread.sleep(999000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}