BlockingQueue.size() returns 发布者-订阅者中的大小错误
BlockingQueue.size() returns wrong size in Publisher-Subscribers
我遇到了一个发布者 - 多个订阅者模式的实施问题。发布者使用固定大小的缓冲区并对消息进行排队。消息发送给所有订阅者。订阅者获取消息的顺序必须与发布消息的顺序一致。
我使用 BlockingQueue 来保存发布者消息 (publisherQueue) 并将它们传递给每个订阅者 BlockingQueue (subscriberQueue)。
问题是缓冲区和订阅者工作正常,但缓冲区大小 (publisherQueue.size()) 总是 returns 1.
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
这是我的完整代码:
PublisherSubscriberService.java
package program;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class PublisherSubscriberService {
private int buffer;
private int subscribersNumber;
static Set<subscriber> subscribers = new HashSet<subscriber>();
public PublisherSubscriberService(int buffer, int subscribersNumber) {
this.buffer = buffer;
this.subscribersNumber = subscribersNumber;
}
public void addsubscriber(subscriber subscriber) {
subscribers.add(subscriber);
}
public void start() {
publisher publisher = new publisher(buffer);
System.out.println("publisher started the job");
for (int i = 0; i < subscribersNumber; i++) {
subscriber subscriber = new subscriber(buffer);
subscriber.setName(Integer.toString(i + 1));
subscribers.add(subscriber);
new Thread(subscriber).start();
System.out.println("Subscriber " + subscriber.getName() + " started the job");
}
new Thread(publisher).start();
}
public class Publisher implements Runnable {
private int buffer;
final BlockingQueue<Message> publisherQueue;
public Publisher(int buffer) {
this.buffer = buffer;
publisherQueue = new LinkedBlockingQueue<>(buffer);
}
@Override
public void run() {
for (int i = 1; i < 100; i++) {
Message messageObject = new Message("" + i);
try {
Thread.sleep(50);
publisherQueue.put(messageObject);
System.out.println("Queued message no " + messageObject.getMessage());
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
for (subscriber subscriber : subscribers) {
subscriber.subscriberQueue.put(messageObject);
}
publisherQueue.take();
} catch (InterruptedException e) {
System.out.println("Some error");
e.printStackTrace();
}
}
}
}
class Subscriber implements Runnable {
private String name;
private int buffer;
final BlockingQueue<Message> subscriberQueue;
public Subscriber(int buffer) {
this.buffer = buffer;
subscriberQueue = new LinkedBlockingQueue<>(buffer);
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try {
Message messageObject;
while (true) {
Thread.sleep(100);
messageObject = subscriberQueue.take();
System.out.println(this.getName() + " got message: " + messageObject.getMessage());
}
} catch (InterruptedException e) {
System.out.println("Some error");
e.printStackTrace();
}
}
}
class Message {
private String message;
public Message(String str) {
this.message = str;
}
public String getMessage() {
return message;
}
}
}
PublisherSubscriberProgram.java
package program;
public class ProducerConsumerProgram {
public static void main(String[] args) {
ProducerConsumerService service = new ProducerConsumerService(10, 3);
service.start();
}
}
您的发布商队列中的项目永远不会超过 1 个。每次通过你的循环你放和拿一个项目:
**publisherQueue.put(messageObject);**
System.out.println("Queued message no " + messageObject.getMessage());
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
for (subscriber subscriber : subscribers) {
subscriber.subscriberQueue.put(messageObject);
}
**publisherQueue.take();**
使用您提供的代码,甚至让发布者排队也有意义。
我遇到了一个发布者 - 多个订阅者模式的实施问题。发布者使用固定大小的缓冲区并对消息进行排队。消息发送给所有订阅者。订阅者获取消息的顺序必须与发布消息的顺序一致。
我使用 BlockingQueue 来保存发布者消息 (publisherQueue) 并将它们传递给每个订阅者 BlockingQueue (subscriberQueue)。
问题是缓冲区和订阅者工作正常,但缓冲区大小 (publisherQueue.size()) 总是 returns 1.
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
这是我的完整代码:
PublisherSubscriberService.java
package program;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class PublisherSubscriberService {
private int buffer;
private int subscribersNumber;
static Set<subscriber> subscribers = new HashSet<subscriber>();
public PublisherSubscriberService(int buffer, int subscribersNumber) {
this.buffer = buffer;
this.subscribersNumber = subscribersNumber;
}
public void addsubscriber(subscriber subscriber) {
subscribers.add(subscriber);
}
public void start() {
publisher publisher = new publisher(buffer);
System.out.println("publisher started the job");
for (int i = 0; i < subscribersNumber; i++) {
subscriber subscriber = new subscriber(buffer);
subscriber.setName(Integer.toString(i + 1));
subscribers.add(subscriber);
new Thread(subscriber).start();
System.out.println("Subscriber " + subscriber.getName() + " started the job");
}
new Thread(publisher).start();
}
public class Publisher implements Runnable {
private int buffer;
final BlockingQueue<Message> publisherQueue;
public Publisher(int buffer) {
this.buffer = buffer;
publisherQueue = new LinkedBlockingQueue<>(buffer);
}
@Override
public void run() {
for (int i = 1; i < 100; i++) {
Message messageObject = new Message("" + i);
try {
Thread.sleep(50);
publisherQueue.put(messageObject);
System.out.println("Queued message no " + messageObject.getMessage());
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
for (subscriber subscriber : subscribers) {
subscriber.subscriberQueue.put(messageObject);
}
publisherQueue.take();
} catch (InterruptedException e) {
System.out.println("Some error");
e.printStackTrace();
}
}
}
}
class Subscriber implements Runnable {
private String name;
private int buffer;
final BlockingQueue<Message> subscriberQueue;
public Subscriber(int buffer) {
this.buffer = buffer;
subscriberQueue = new LinkedBlockingQueue<>(buffer);
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
try {
Message messageObject;
while (true) {
Thread.sleep(100);
messageObject = subscriberQueue.take();
System.out.println(this.getName() + " got message: " + messageObject.getMessage());
}
} catch (InterruptedException e) {
System.out.println("Some error");
e.printStackTrace();
}
}
}
class Message {
private String message;
public Message(String str) {
this.message = str;
}
public String getMessage() {
return message;
}
}
}
PublisherSubscriberProgram.java
package program;
public class ProducerConsumerProgram {
public static void main(String[] args) {
ProducerConsumerService service = new ProducerConsumerService(10, 3);
service.start();
}
}
您的发布商队列中的项目永远不会超过 1 个。每次通过你的循环你放和拿一个项目:
**publisherQueue.put(messageObject);**
System.out.println("Queued message no " + messageObject.getMessage());
System.out.println("Actual number of messages in buffer: " + publisherQueue.size());
for (subscriber subscriber : subscribers) {
subscriber.subscriberQueue.put(messageObject);
}
**publisherQueue.take();**
使用您提供的代码,甚至让发布者排队也有意义。