JAVA 中的生产者消费者模式
Producer consumer pattern in JAVA
Here the pattern of consumer-producer
Producer
创建一条新消息,consumer
通过 Broker
监听它们。
到目前为止,producer
创建并发送消息,而 consumer
等待消息。当新消息出现时 consumer
将其打印到屏幕上。
我希望 Producer
被告知 Consumer
已收到消息并将其出队。
请问有什么方法可以从Consumer
变成notify/feedackProducer
?或者我应该通过 Broker
通知 Consumer
?怎么做到的?
我会问制片人为什么需要知道。
producer/consumer 的全部目的是解耦这两个对象。生产者不知道也不关心谁处理消息。
你的提议打破了那个模式。你不应该这样做,除非你有很好的理由 "I want to."
如果必须,请为消费者 class 提供对生产者的引用,并在其上公开可用于发送消息的方法。或者通过让消费者写入生产者也可以查询的 table 来将它们耦合到数据库中。
但现在两人纠缠不清。我认为这是一个糟糕的选择。
JMS 标准支持回复队列 (JMSReplyTo),以便原始消费者可以 return 向特定队列上的原始生产者回复消息。
如图所示;
http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html
如果您还使用 JMSCorrelationID,那么您可以 link 将消息放在一起,以便您知道回复与 ID xxxxx 的特定请求相关。
当然,这意味着消费者将成为回复消息的生产者。
提供的link也有展示工作示例的代码(虽然我自己没试过)
它更像是 Listener 模式。使用 EventBus
。 2 个事件:public PutEvent(MyType message) {...}
、public TakeEvent(long id) {...}
。那么:
EventBus bus = new SimpleEventBus();
bus.addHandler(producerInstance::onMessageTaken, GetEvent.getType());
bus.addHandler(consumerInstance::onMessageArrived, PutEvent.getType());
消费者:
MyType message = queue.pop();
bus.fireEventFromSource(new TakeEvent(message.getId()), this);
...
@Override public void onMessageArrived(PutEvent event) {
// We know, that there're messages out there and can later pop it
System.out.printf("%d from %s", event.getMessage().getId(), event.getSource() != null ? event.getSource().hashCode() + "" : "unknown");
}
制作人:
MyType message = new MyType(data, ++id);
queue.add(message);
bus.fireEventFromSource(new PutEvent(message), this);
...
@Override public void onMessageTaken(TakeEvent event) {
// Wow, who took it?
System.out.printf("%s took %d", event.getSource() != null ? event.getSource().hashCode() + "" : "unknown", event.getId());
}
Here the pattern of consumer-producer
Producer
创建一条新消息,consumer
通过 Broker
监听它们。
到目前为止,producer
创建并发送消息,而 consumer
等待消息。当新消息出现时 consumer
将其打印到屏幕上。
我希望 Producer
被告知 Consumer
已收到消息并将其出队。
请问有什么方法可以从Consumer
变成notify/feedackProducer
?或者我应该通过 Broker
通知 Consumer
?怎么做到的?
我会问制片人为什么需要知道。
producer/consumer 的全部目的是解耦这两个对象。生产者不知道也不关心谁处理消息。
你的提议打破了那个模式。你不应该这样做,除非你有很好的理由 "I want to."
如果必须,请为消费者 class 提供对生产者的引用,并在其上公开可用于发送消息的方法。或者通过让消费者写入生产者也可以查询的 table 来将它们耦合到数据库中。
但现在两人纠缠不清。我认为这是一个糟糕的选择。
JMS 标准支持回复队列 (JMSReplyTo),以便原始消费者可以 return 向特定队列上的原始生产者回复消息。
如图所示;
http://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html
如果您还使用 JMSCorrelationID,那么您可以 link 将消息放在一起,以便您知道回复与 ID xxxxx 的特定请求相关。
当然,这意味着消费者将成为回复消息的生产者。
提供的link也有展示工作示例的代码(虽然我自己没试过)
它更像是 Listener 模式。使用 EventBus
。 2 个事件:public PutEvent(MyType message) {...}
、public TakeEvent(long id) {...}
。那么:
EventBus bus = new SimpleEventBus();
bus.addHandler(producerInstance::onMessageTaken, GetEvent.getType());
bus.addHandler(consumerInstance::onMessageArrived, PutEvent.getType());
消费者:
MyType message = queue.pop();
bus.fireEventFromSource(new TakeEvent(message.getId()), this);
...
@Override public void onMessageArrived(PutEvent event) {
// We know, that there're messages out there and can later pop it
System.out.printf("%d from %s", event.getMessage().getId(), event.getSource() != null ? event.getSource().hashCode() + "" : "unknown");
}
制作人:
MyType message = new MyType(data, ++id);
queue.add(message);
bus.fireEventFromSource(new PutEvent(message), this);
...
@Override public void onMessageTaken(TakeEvent event) {
// Wow, who took it?
System.out.printf("%s took %d", event.getSource() != null ? event.getSource().hashCode() + "" : "unknown", event.getId());
}