运行 和多线程生产者-消费者调试模式下的不同结果

Different results in run and deburg mode with Multithread producer-consumer

在一个 JPA 项目中,我有两个线程:生产者和消费者。

生产者应该从另一个 class 获取推特查询的搜索结果并将其放入 LinkedBlockingQueue,线程消费者应该使用结果并使用另一个 class 将它们持久化到 MYSQL.

这里我先给大家展示主要的class,生产者和消费者:

主要 Class:

public class RunQuery {

    public final static EntityManagerFactory   
  emf=Persistence.createEntityManagerFactory("mypu");

    public static void main(String[] args) throws Exception

        {
            org.apache.log4j.BasicConfigurator.configure(new 
     NullAppender());

            BlockingQueue<Pair<String, QueryResult>> tweetBuffer=new 
     LinkedBlockingQueue<Pair<String,QueryResult>>();

            // Creating Producer and Consumer Thread
            Runnable producerThread = new 
         TwitterStreamProducer("producer",tweetBuffer,args);

            Runnable consumerThread = new 
       TwitterStreamConsumer("consumer",tweetBuffer);

           Thread producer=new Thread(producerThread);
           Thread consumer=new Thread(consumerThread);

           producer.start();  

           Thread.sleep(100);

            consumer.start();               
        }

生产者 线程:

   public class TwitterStreamProducer implements Runnable{

    private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
    private final ResultsController resultsController;
    private String[] Keywords;
    private String name;

    public TwitterStreamProducer(String name,
            BlockingQueue<Pair<String, QueryResult>> tweetBuffer, 
           String[] 

        keywords) {

        this.tweetBuffer=tweetBuffer;
        this.resultsController=new ResultsController();
        this.Keywords=keywords;
        this.name=name;

    }

    public void run() {

        for(String key:Keywords)
        {
        boolean interrupted = false;
        try {
            this.tweetBuffer.put( new Pair<String,QueryResult>
      (key,resultsController.search(key)) );
            Logger.getLogger("TwitterApp").info("one result added to the
       queue, current size:"+tweetBuffer.size());
        }

        catch (InterruptedException e) {
              interrupted = true;
        //  Logger.getLogger("Producer").info( this.name+ " Interrupted
            "+e.toString() );
            }
        catch(Exception e)
        {
            interrupted = true;
        }
        finally {
            if (interrupted)
                Thread.currentThread().interrupt();
                }

        }


    }

消费者 主题:

public class TwitterStreamConsumer  implements Runnable{

    private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
    private final ResultsController resultsController;
    private String name=null;

    public TwitterStreamConsumer(String name,
            BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {

        this.name=name;
        this.tweetBuffer=tweetBuffer;
        this.resultsController=new ResultsController();
    }

    public void run() {


        while(! tweetBuffer.isEmpty())
        {
        try {
            Logger.getLogger("TwitterApp").info(this.name+" has consummed
     queue, current size of queue:"+tweetBuffer.size());
            resultsController.parse(this.tweetBuffer.take());
        } catch (InterruptedException e) {
        Logger.getLogger("Consumer").info(this.name+ " interrupted   
    "+e.getMessage());
        }

        }

    }

}

如果需要其他信息,我会在这里提供。

这是 Logger 的结果,当我 运行 它时,生产者总是产生 8 个结果,之后什么都没有发生,应用程序不会被中断或不会产生任何错误:

Jul 12, 2015 12:07:22 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:1
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:2
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:3
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:4
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:5
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:6
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:7
Jul 12, 2015 12:07:26 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:8

当我在消费者线程的运行方法开始处使用断点调试时,它可以正常工作。

提前感谢您的建议。

while(! tweetBuffer.isEmpty())

如果队列为空,这将跳过循环。因此,如果生产者在将某些内容存储到队列中之前花费了一些时间,那么您的消费者不会做任何事情。摆脱那张支票。只需使用一个在线程中断时或队列包含假结束标记元素时停止的循环,并重复从队列中取出下一个元素。

你的消费者总是 运行 进入一个空队列然后死去。当你调试它时,你会自动让它等待足够长的时间让生产者填满队列。

您的消费者在第一个 运行 结束。当它第一次执行时,缓冲区应该是空的,然后它不会进入while循环并结束运行方法和Thread。

您必须循环等待缓冲区,有时,如果缓冲区为空,这不应该结束消费者。它只需等待下一条消息。 你可以用这样的代码来做到这一点:

public class TwitterStreamConsumer  implements Runnable{

    private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
    private final ResultsController resultsController;
    private String name=null;
    private boolean stopped;

    public TwitterStreamConsumer(String name, BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {

        this.name=name;
        this.tweetBuffer=tweetBuffer;
        this.resultsController=new ResultsController();
    }

    public void stop() {
        stopped = true;
    }

    public void run() {
        stopped = false;


        while(! stopped) {
            try {
                resultsController.parse(this.tweetBuffer.take());
                Logger.getLogger("TwitterApp").info(this.name+" has consummed queue, current size of queue:"+tweetBuffer.size());
            }
            catch (InterruptedException e) {
                Logger.getLogger("Consumer").info(this.name+ " interrupted "+e.getMessage());
            }
        }
    }
}

要停止线程,您必须执行类似的操作:

consumer.stop();
consumerThread.interrupt();