使用 Producer/Consumer 模型处理文件

Processing a file using Producer/Consumer Model

在最近删除的post中,我提出了以下问题:


我正在尝试编写一个实现 Producer/Consumer 模型的多线程程序。通常,我想使用一个生产者从文件中读取行并将它们放入 BlockingQueue,并让多个消费者在从 BlockingQueue 中检索行并将结果存储在新文件中后进行一些处理。

我希望您能给我一些反馈,告诉我应该考虑什么才能实现高性能。我花了数周时间阅读有关并发和同步的内容,因为我不想错过任何内容,但我正在寻找一些外部反馈。请在下面找到我需要的信息。

希望我没说错


你建议在提问之前先实现一些东西,所以我删除了 post 并尝试实现该模型。这是我的代码。

Producer 我有一个线程从文件中读取并将它们放入 BlockingQueue。

class Producer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    private float numline=0;


    protected transient BufferedReader bufferedReader;
    protected transient BufferedWriter bufferedWriter;


    public Producer (String location, BlockingQueue<String> blockingQueue) {
        this.location=location;
        this.blockingQueue=blockingQueue;

        try {
            bufferedReader = new BufferedReader(new FileReader(location));

            // Create the file where the processed lines will be stored
            createCluster();

        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        }
    }

    @Override
    public void run() {
        String line=null;
        try {
            while ((line = bufferedReader.readLine()) != null) {
                // Count the read lines
                numline++;
                blockingQueue.put(line);
            }
        } catch (IOException e) {
            System.out.println("Problem reading the log file!");
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public void createCluster () {
        try {
            String clusterName=location+".csv";
            bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
            bufferedWriter.write("\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

多个线程将从 BlockingQueue 中取出并进行一些处理的消费者 'f()' 并将结果存储在新文件中。

class Consumer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    protected transient BufferedWriter bufferedWriter;

    private String clusterName;

    public Consumer (String location, BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
        this.location=location;

        clusterName=location+".csv";
    }

    @Override
    public void run() {
        while (true) {
            try {
                //Retrieve the lines
                String line = blockingQueue.take();
                String result = doNormalize (line);
                // TO DO
                //
                //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
                //BufferedWriter.write(result+ "\n");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }

//Pattern pattern, Matcher matcher
    private String doNormalize(String line){
        String rules [] = getRules(); // return an array of Regex
        String tmp="";

        for (String rule : rules) {
            Pattern pattern = Pattern.compile(rule);
            Matcher matcher = pattern.matcher(line);

            if (matcher.find()){
                Set<String> namedGroups = getNamedGroupCandidates(rule);
                Iterator<String> itr = namedGroups.iterator();
                while(itr.hasNext()){
                    String value=itr.next();
                    tmp=tmp+matcher.group(value)+", ";
                }


        tmp = tmp + "\t";
                    break;
                }
            }
            return tmp;

        }
private Set<String> getNamedGroupCandidates(String regex) {
            Set<String> namedGroups = new TreeSet<String>();
            Matcher m = Pattern.compile("\(\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
            while (m.find()) {
                namedGroups.add(m.group(1));
            }
            return namedGroups;
        }
}

和我的主程序中的代码 class。使用 1 个生产者和 3 个消费者

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

            Producer readingThread = new Producer(location, queue);
            new Thread(readingThread).start();

            Consumer normalizers = new Consumer(location,queue);
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 1; i <= 3; i++) {
                executor.submit(normalizers);
            }
            System.out.println("Stopped");
            executor.shutdown();

我知道我的代码不完整,因为我需要关闭并刷新 reader 并写入等等。但是你能告诉我到目前为止我在实施 Producer/Consumer 模型时犯的错误吗?还有方法f(),它是一个处理一行并产生结果的方法,我不认为我应该同步它,因为我希望所有的消费者同时使用。

编辑

最后,这个post真的让我很困惑,它表明如果消费者将结果存储在文件中,它会减慢这个过程。这可能是个问题,因为我想要性能和速度。

最佳,

对于我的第二个问题:"The SingleConsumer to "知道“多个消费者已经完成了consuming/processing所有行。”。我从这个 post 结合这个评论得到启发: 每个消费者应该发送一个 "I terminated" 消息到队列 2,如果单个输出消费者收到所有这些,它也可以终止。

因此,对于消费者而言;这是我在 run() 方法中写的:

@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
    while (true) {
        try {

            //Retrieve the lines
            String line = firstBlockingQueue.take(); 
If a special terminating value is found.
            if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
                firstBlockingQueue.put(POISON_PILL);
                secondBlockingQueue.put(SINGLE_POISIN_PILL);
                return;
            }
            // Put the normalized events on the new Queue
            String result = doNormalize (line);
            if (result!=null) {
                secondBlockingQueue.put(result);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
    }
} 

至于SinglerConsumer,它应该计算Consumers发送的"I finished processing"条消息,或者我把它当作SINGLE_POISON_PILL来使用。并在该计数器达到队列 1 中的消费者数量时终止。

while (true) {
    try {
        //Retrieve the lines
        String line = secondBlockingQueue.take();
        if (line==SINGLE_POISIN_PILL) {

            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                return;
            }
        }

        try {
            if (line != SINGLE_POISIN_PILL) {
                System.out.println(line);
                bufferedWriter.write(line+"\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } 
}

对于我的第二个问题,显然我所要做的就是添加:

        if (line==SINGLE_POISIN_PILL) {
            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                try {
         if (bufferedWriter != null) 
         {
             bufferedWriter.flush();
             bufferedWriter.close();
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
                return;
            }
        }

刷新并关闭缓冲区后,缓冲区开始写入。

希望得到您的反馈。