Java 线程间通信不传递消息对象
Java Interthread Communication not passing message object
为了在线程之间进行通信,我遵循了易于编译的 Oracle Guarded Blocks example 和 运行s。我的架构略有不同,因为我的消费者产生了生产者任务,尽管我在示例中尝试了这种变体并且它运行完美。
我的主程序中的相关代码;
public static void main(String[] args) {
...
FrameMsg frameMsg = new FrameMsg();
AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(frameMsg);
awarenessAnalytic.start();
来自消费者线程的相关代码;
public class AwarenessAnalytics extends Thread implements MotionEventListener{
FrameMsg frameMsg;
FrameWithMotionDetection frameWithMotionDetection;
public AwarenessAnalytics(FrameMsg frameMsg) {
this.frameMsg = frameMsg;
System.out.println("AwarenessAnalytic frameMsg = " + this.frameMsg.hashCode());
}
AdvancedVideoAnalytics tempIntermediateVA;
tempIntermediateVA = new AdvancedVideoAnalytics(frameMsg);
public void run() {
tempIntermediateVA.start();
while (true) {
// TODO: create loop to process frames from each video stream
frameWithMotionDetection = new FrameWithMotionDetection();
// interthread message from AdvancedAnalytic
System.out.println("Waiting for FrameMsg");
frameWithMotionDetection = frameMsg.take();
System.out.println("FrameMsg received");
}
来自生产者任务的相关代码;
public class AdvancedVideoAnalytics extends Thread {
FrameMsg frameMsg;
FrameWithMotionDetection frameWithMotionDetection;
public AdvancedVideoAnalytics (FrameMsg frameMsg) {
this.frameMsg = frameMsg;
System.out.println("AdvancedVideoAnalytic frameMsg = " + this.frameMsg.hashCode());
}
// the run method includes;
// Send frame and any clusters detected
// as frameMsg
frameWithMotionDetection = new FrameWithMotionDetection();
frameWithMotionDetection.setMotionData(contourAnalysisResults);
frameWithMotionDetection.setCurrentFrame(frameToExamine);
System.out.println("Preparing to send message to AwarenessAnalytics thread");
frameMsg.put(frameWithMotionDetection);
FrameMsg class;
public class FrameMsg {
// Message sent from video stream monitors to analytic fusion engine
private FrameWithMotionDetection frameWithMotionData;
//private String message;
// True if consumer should wait
// for producer to send message,
// false if producer should wait for
// consumer to retrieve message.
private boolean empty = true;
public synchronized FrameWithMotionDetection take() {
// Wait until message is
// available.
System.out.println("Getting ready to take frameWithMotionData");
while (empty) {
try {
wait(10);
System.out.println("Waiting to take frameWithMotionData because empty = true");
} catch (InterruptedException e) {}
}
// Toggle status.
empty = true;
System.out.println("Successfully took frameWithMotionData, empty = " + empty);
// Notify producer that
// status has changed.
notifyAll();
return frameWithMotionData;
}
public synchronized void put(FrameWithMotionDetection frameWithMotionData) {
// Wait until message has
// been retrieved.
System.out.println("Getting ready to put frameWithMotionData");
while (!empty) {
try {
System.out.println("Waiting to put frameWithMotionData because empty = false");
wait();
} catch (InterruptedException e) {}
}
// Toggle status.
empty = false;
// Store message.
this.frameWithMotionData = frameWithMotionData;
System.out.println("Successfully put frameWithMotionData, empty = " + empty);
// Notify consumer that status
// has changed.
notifyAll();
}
}
有趣的是,所有的 frameMsg 对象 ID 都是相同的,我能够 'put' 一个 frameMsg 并将生产者的空设置为 false。但是消费者看到的frameMsg对象总是returns'true'为空
输出摘录如下;
VideoAnalyticsUnitTest frameMsg = 1704856573
AwarenessAnalytic frameMsg = 1704856573
AdvancedVideoAnalytic frameMsg = 1704856573
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
(many of these)...
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Successfully put frameWithMotionData, empty = false
Waiting to take frameWithMotionData because empty = true
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Waiting to put frameWithMotionData because empty = false
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
它像最后三行一样继续,直到我终止程序。
我很困惑,因为;
1.我按照这个例子
2.对象ID匹配
然而,消费者永远不会看到非空的 frameMsg(这是一个复杂的对象)。
我是不是漏掉了一些明显的东西?
我最初使用侦听器发送消息,但我不想让一个庞大的应用程序占用侦听器 space。现在阅读更多评论,似乎我可以使用侦听器并将消息传递给具有阻塞队列的消费者的 运行 部分。
如果是您,您会采用上述通信方法,还是恢复为具有阻塞队列的侦听器?
如果要更新 empty
参数,请不要使用 synchronized
方法,而是使用
synchronized
块
synchronized(this){
//work from here for core logic
}
//empty =true logic or empty = false
块确实比方法有优势,最重要的是灵活性,因为您可以使用其他对象作为锁,而同步方法将锁定整个 class。
比较:
// locks the whole object
...
private synchronized void someInputRelatedWork() {
...
}
private synchronized void someOutputRelatedWork() {
...
}
比
// Using specific locks
Object inputLock = new Object();
Object outputLock = new Object();
private void someInputRelatedWork() {
synchronize(inputLock) {
...
}
}
private void someOutputRelatedWork() {
synchronize(outputLock) {
...
}
}
此外,如果方法增长,您仍然可以保持同步部分分离:
private void method() {
... code here
... code here
... code here
synchronized( lock ) {
...very few lines of code here
}
... code here
... code here
... code here
... code here
}
最后的建议是使用 LinkedBlockingQueue
因为它有很好的性能和好的方法 put and take
正如@Bhargav Modi 指出的那样,代码 运行 涉及编写多线程应用程序的更精细问题(同步块与 - 方法,在关键变量上使用 volatile
声明)。这些问题在测试过程中经常被遗漏,因为需要一些偶然因素才能使问题出现(最臭名昭著的问题之一是 double checked locking)。
这是使用 Java concurrent classes: there is less chance of writing code that is not thread-safe or has multi-threading issues. In your case, the SynchronousQueue 看起来不错的替代品的充分理由。使用 SynchronousQueue,无需使用 empty
变量、this.frameWithMotionData
变量或 wait/notifyAll 机制。
为了在线程之间进行通信,我遵循了易于编译的 Oracle Guarded Blocks example 和 运行s。我的架构略有不同,因为我的消费者产生了生产者任务,尽管我在示例中尝试了这种变体并且它运行完美。
我的主程序中的相关代码;
public static void main(String[] args) {
...
FrameMsg frameMsg = new FrameMsg();
AwarenessAnalytics awarenessAnalytic = new AwarenessAnalytics(frameMsg);
awarenessAnalytic.start();
来自消费者线程的相关代码;
public class AwarenessAnalytics extends Thread implements MotionEventListener{
FrameMsg frameMsg;
FrameWithMotionDetection frameWithMotionDetection;
public AwarenessAnalytics(FrameMsg frameMsg) {
this.frameMsg = frameMsg;
System.out.println("AwarenessAnalytic frameMsg = " + this.frameMsg.hashCode());
}
AdvancedVideoAnalytics tempIntermediateVA;
tempIntermediateVA = new AdvancedVideoAnalytics(frameMsg);
public void run() {
tempIntermediateVA.start();
while (true) {
// TODO: create loop to process frames from each video stream
frameWithMotionDetection = new FrameWithMotionDetection();
// interthread message from AdvancedAnalytic
System.out.println("Waiting for FrameMsg");
frameWithMotionDetection = frameMsg.take();
System.out.println("FrameMsg received");
}
来自生产者任务的相关代码;
public class AdvancedVideoAnalytics extends Thread {
FrameMsg frameMsg;
FrameWithMotionDetection frameWithMotionDetection;
public AdvancedVideoAnalytics (FrameMsg frameMsg) {
this.frameMsg = frameMsg;
System.out.println("AdvancedVideoAnalytic frameMsg = " + this.frameMsg.hashCode());
}
// the run method includes;
// Send frame and any clusters detected
// as frameMsg
frameWithMotionDetection = new FrameWithMotionDetection();
frameWithMotionDetection.setMotionData(contourAnalysisResults);
frameWithMotionDetection.setCurrentFrame(frameToExamine);
System.out.println("Preparing to send message to AwarenessAnalytics thread");
frameMsg.put(frameWithMotionDetection);
FrameMsg class;
public class FrameMsg {
// Message sent from video stream monitors to analytic fusion engine
private FrameWithMotionDetection frameWithMotionData;
//private String message;
// True if consumer should wait
// for producer to send message,
// false if producer should wait for
// consumer to retrieve message.
private boolean empty = true;
public synchronized FrameWithMotionDetection take() {
// Wait until message is
// available.
System.out.println("Getting ready to take frameWithMotionData");
while (empty) {
try {
wait(10);
System.out.println("Waiting to take frameWithMotionData because empty = true");
} catch (InterruptedException e) {}
}
// Toggle status.
empty = true;
System.out.println("Successfully took frameWithMotionData, empty = " + empty);
// Notify producer that
// status has changed.
notifyAll();
return frameWithMotionData;
}
public synchronized void put(FrameWithMotionDetection frameWithMotionData) {
// Wait until message has
// been retrieved.
System.out.println("Getting ready to put frameWithMotionData");
while (!empty) {
try {
System.out.println("Waiting to put frameWithMotionData because empty = false");
wait();
} catch (InterruptedException e) {}
}
// Toggle status.
empty = false;
// Store message.
this.frameWithMotionData = frameWithMotionData;
System.out.println("Successfully put frameWithMotionData, empty = " + empty);
// Notify consumer that status
// has changed.
notifyAll();
}
}
有趣的是,所有的 frameMsg 对象 ID 都是相同的,我能够 'put' 一个 frameMsg 并将生产者的空设置为 false。但是消费者看到的frameMsg对象总是returns'true'为空
输出摘录如下;
VideoAnalyticsUnitTest frameMsg = 1704856573
AwarenessAnalytic frameMsg = 1704856573
AdvancedVideoAnalytic frameMsg = 1704856573
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
(many of these)...
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Successfully put frameWithMotionData, empty = false
Waiting to take frameWithMotionData because empty = true
Preparing to send message to AwarenessAnalytics thread
Getting ready to put frameWithMotionData
Waiting to put frameWithMotionData because empty = false
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
Waiting to take frameWithMotionData because empty = true
它像最后三行一样继续,直到我终止程序。
我很困惑,因为; 1.我按照这个例子 2.对象ID匹配
然而,消费者永远不会看到非空的 frameMsg(这是一个复杂的对象)。
我是不是漏掉了一些明显的东西?
我最初使用侦听器发送消息,但我不想让一个庞大的应用程序占用侦听器 space。现在阅读更多评论,似乎我可以使用侦听器并将消息传递给具有阻塞队列的消费者的 运行 部分。
如果是您,您会采用上述通信方法,还是恢复为具有阻塞队列的侦听器?
如果要更新 empty
参数,请不要使用 synchronized
方法,而是使用
synchronized
块
synchronized(this){
//work from here for core logic
}
//empty =true logic or empty = false
块确实比方法有优势,最重要的是灵活性,因为您可以使用其他对象作为锁,而同步方法将锁定整个 class。
比较:
// locks the whole object
...
private synchronized void someInputRelatedWork() {
...
}
private synchronized void someOutputRelatedWork() {
...
}
比
// Using specific locks
Object inputLock = new Object();
Object outputLock = new Object();
private void someInputRelatedWork() {
synchronize(inputLock) {
...
}
}
private void someOutputRelatedWork() {
synchronize(outputLock) {
...
}
}
此外,如果方法增长,您仍然可以保持同步部分分离:
private void method() {
... code here
... code here
... code here
synchronized( lock ) {
...very few lines of code here
}
... code here
... code here
... code here
... code here
}
最后的建议是使用 LinkedBlockingQueue
因为它有很好的性能和好的方法 put and take
正如@Bhargav Modi 指出的那样,代码 运行 涉及编写多线程应用程序的更精细问题(同步块与 - 方法,在关键变量上使用 volatile
声明)。这些问题在测试过程中经常被遗漏,因为需要一些偶然因素才能使问题出现(最臭名昭著的问题之一是 double checked locking)。
这是使用 Java concurrent classes: there is less chance of writing code that is not thread-safe or has multi-threading issues. In your case, the SynchronousQueue 看起来不错的替代品的充分理由。使用 SynchronousQueue,无需使用 empty
变量、this.frameWithMotionData
变量或 wait/notifyAll 机制。