Java 线程间通信不传递消息对象

Java Interthread Communication not passing message object

为了在线程之间进行通信,我遵循了易于编译的 Oracle Guarded Blocks example 和 运行s。我的架构略有不同,因为我的消费者产生了生产者任务,尽管我在示例中尝试了这种变体并且它运行完美。

我的主程序中的相关代码;

public static void main(String[] args) {
...
    FrameMsg frameMsg = new FrameMsg();
    AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(frameMsg);
    awarenessAnalytic.start();

来自消费者线程的相关代码;

public class AwarenessAnalytics extends Thread implements MotionEventListener{
    FrameMsg frameMsg;
    FrameWithMotionDetection frameWithMotionDetection;

      public AwarenessAnalytics(FrameMsg frameMsg) {
        this.frameMsg = frameMsg;
        System.out.println("AwarenessAnalytic frameMsg = " + this.frameMsg.hashCode());
        }
 AdvancedVideoAnalytics tempIntermediateVA;
 tempIntermediateVA = new AdvancedVideoAnalytics(frameMsg);

public void run() {

    tempIntermediateVA.start();

    while (true) {
        // TODO: create loop to process frames from each video stream
        frameWithMotionDetection = new FrameWithMotionDetection();
        // interthread message from AdvancedAnalytic
        System.out.println("Waiting for FrameMsg");
        frameWithMotionDetection = frameMsg.take();
        System.out.println("FrameMsg received");
}

来自生产者任务的相关代码;

public class AdvancedVideoAnalytics extends Thread {
  FrameMsg frameMsg;
  FrameWithMotionDetection frameWithMotionDetection;

public AdvancedVideoAnalytics (FrameMsg frameMsg) {
    this.frameMsg = frameMsg;
    System.out.println("AdvancedVideoAnalytic frameMsg = " + this.frameMsg.hashCode());
 }

// the run method includes;

// Send frame and any clusters detected
// as frameMsg
frameWithMotionDetection = new FrameWithMotionDetection();

frameWithMotionDetection.setMotionData(contourAnalysisResults);

frameWithMotionDetection.setCurrentFrame(frameToExamine);
System.out.println("Preparing to send message to AwarenessAnalytics thread");
frameMsg.put(frameWithMotionDetection);

FrameMsg class;

public class FrameMsg {
// Message sent from video stream monitors to analytic fusion engine

private FrameWithMotionDetection frameWithMotionData;

//private String message;
// True if consumer should wait
// for producer to send message,
// false if producer should wait for
// consumer to retrieve message.
private boolean empty = true;

public synchronized FrameWithMotionDetection take() {
    // Wait until message is
    // available.
    System.out.println("Getting ready to take frameWithMotionData");
    while (empty) {
        try {
            wait(10);
            System.out.println("Waiting to take frameWithMotionData because empty = true");
        } catch (InterruptedException e) {}
    }
    // Toggle status.
    empty = true;
    System.out.println("Successfully took frameWithMotionData, empty = " + empty);
    // Notify producer that
    // status has changed.
    notifyAll();
    return frameWithMotionData;
}

public synchronized void put(FrameWithMotionDetection frameWithMotionData) {
    // Wait until message has
    // been retrieved.
    System.out.println("Getting ready to put frameWithMotionData");
    while (!empty) {
        try { 
            System.out.println("Waiting to put frameWithMotionData because empty = false");
            wait();
        } catch (InterruptedException e) {}
    }
    // Toggle status.
    empty = false;
    // Store message.
    this.frameWithMotionData = frameWithMotionData;
    System.out.println("Successfully put frameWithMotionData, empty = " + empty);
    // Notify consumer that status
    // has changed.
    notifyAll();
}

}

有趣的是,所有的 frameMsg 对象 ID 都是相同的,我能够 'put' 一个 frameMsg 并将生产者的空设置为 false。但是消费者看到的frameMsg对象总是returns'true'为空

输出摘录如下;

VideoAnalyticsUnitTest frameMsg = 1704856573
AwarenessAnalytic frameMsg = 1704856573
AdvancedVideoAnalytic frameMsg = 1704856573

Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
(many of these)...
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Successfully put frameWithMotionData, empty = false
Waiting to take frameWithMotionData because empty = true
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Waiting to put frameWithMotionData because empty = false
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true

它像最后三行一样继续,直到我终止程序。

我很困惑,因为; 1.我按照这个例子 2.对象ID匹配

然而,消费者永远不会看到非空的 frameMsg(这是一个复杂的对象)。

我是不是漏掉了一些明显的东西?

我最初使用侦听器发送消息,但我不想让一个庞大的应用程序占用侦听器 space。现在阅读更多评论,似乎我可以使用侦听器并将消息传递给具有阻塞队列的消费者的 运行 部分。

如果是您,您会采用上述通信方法,还是恢复为具有阻塞队列的侦听器?

如果要更新 empty 参数,请不要使用 synchronized 方法,而是使用 synchronized

synchronized(this){
//work from here for core logic 

}
//empty =true logic or empty = false

块确实比方法有优势,最重要的是灵活性,因为您可以使用其他对象作为锁,而同步方法将锁定整个 class。

比较:

// locks the whole object
... 
private synchronized void someInputRelatedWork() {
... 
}
private synchronized void someOutputRelatedWork() {
... 
}

// Using specific locks
Object inputLock = new Object();
Object outputLock = new Object();

private void someInputRelatedWork() {
synchronize(inputLock) { 
    ... 
} 
}
private void someOutputRelatedWork() {
synchronize(outputLock) { 
    ... 
}
}

此外,如果方法增长,您仍然可以保持同步部分分离:

 private void method() {
 ... code here
 ... code here
 ... code here
 synchronized( lock ) { 
    ...very few lines of code here
 }
 ... code here
 ... code here
 ... code here
 ... code here
}

最后的建议是使用 LinkedBlockingQueue 因为它有很好的性能和好的方法 put and take

正如@Bhargav Modi 指出的那样,代码 运行 涉及编写多线程应用程序的更精细问题(同步块与 - 方法,在关键变量上使用 volatile 声明)。这些问题在测试过程中经常被遗漏,因为需要一些偶然因素才能使问题出现(最臭名昭著的问题之一是 double checked locking)。

这是使用 Java concurrent classes: there is less chance of writing code that is not thread-safe or has multi-threading issues. In your case, the SynchronousQueue 看起来不错的替代品的充分理由。使用 SynchronousQueue,无需使用 empty 变量、this.frameWithMotionData 变量或 wait/notifyAll 机制。