如何在 Liferay 7 中配置消息总线?
How to configure Message Bus In Liferay 7?
我想在 DXP 中使用 Liferay 消息总线。我写了下面的代码。
DemoSender.java
package demo.sender.portlet;
import demo.sender.constants.DemoSenderPortletKeys;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.portlet.bridges.mvc.MVCPortlet;
import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.Portlet;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
/**
* @author parth.ghiya
*/
@Component(
immediate = true,
property = {
"com.liferay.portlet.display-category=category.sample",
"com.liferay.portlet.instanceable=true",
"javax.portlet.display-name=demo-sender Portlet",
"javax.portlet.init-param.template-path=/",
"javax.portlet.init-param.view-template=/view.jsp",
"javax.portlet.name=" + DemoSenderPortletKeys.DemoSender,
"javax.portlet.resource-bundle=content.Language",
"javax.portlet.security-role-ref=power-user,user"
},
service = Portlet.class
)
public class DemoSenderPortlet extends MVCPortlet {
@Activate
protected void activate(BundleContext bundleContext) {
_bundleContext = bundleContext;
}
public void sendMessage(
ActionRequest actionRequest, ActionResponse actionResponse) {
if (_log.isInfoEnabled()) {
_log.info("Sending message to DE Echo service");
}
Message message = new Message();
message.setDestinationName("MyEchoDestination");
message.setPayload("Hello World!");
message.setResponseDestinationName("MyEchoResponse");
_messageBus.sendMessage(message.getDestinationName(), message);
}
private static final Log _log = LogFactoryUtil.getLog(DemoSenderPortlet.class);
private BundleContext _bundleContext;
@Reference
private MessageBus _messageBus;
}
DemoReceiver.java
package demo.receiver.portlet;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageListener;
@Component(
immediate = true, property = {"destination.name=MyEchoDestination"},
service = MessageListener.class
)
public class DemoReceiverPortlet extends BaseMessageListener {
@Override
protected void doReceive(Message message) throws Exception {
if (_log.isInfoEnabled()) {
_log.info("Received: " + message);
}
String payload = (String)message.getPayload();
if (_log.isInfoEnabled()) {
_log.info("Message payload: " + payload);
}
/*
String responseDestinationName = message.getResponseDestinationName();
if ((responseDestinationName != null) &&
(responseDestinationName.length() > 0)) {
Message responseMessage = new Message();
responseMessage.setDestinationName(responseDestinationName);
responseMessage.setResponseId(message.getResponseId());
//This is just for demo purposes
responseMessage.setPayload(payload);
_messageBus.sendMessage(
message.getResponseDestinationName(), responseMessage);
}
*/
}
private static final Log _log = LogFactoryUtil.getLog(DemoReceiverPortlet.class);
@Reference
private volatile MessageBus _messageBus;
}
问题是我的 doReceive 方法从未被调用过。
需要进一步添加什么配置?
此致
P.S : 在 DemoSender 中,我在点击按钮时发送了一些消息
编辑#1
我确实添加了如下配置器代码。
package demo.receiver.portlet;
import java.util.Dictionary;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import com.liferay.portal.kernel.concurrent.DiscardOldestPolicy;
import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.util.HashMapDictionary;
@Component(
enabled = false, immediate = true,
service = DemoReceiverConfigurator.class
)
public class DemoReceiverConfigurator {
@Activate
protected void activate(ComponentContext componentContext) {
_bundleContext = componentContext.getBundleContext();
System.out.println("===demo===");
Dictionary<String, Object> properties =
componentContext.getProperties();
DestinationConfiguration destinationConfiguration =
new DestinationConfiguration(DestinationConfiguration.DESTINATION_TYPE_PARALLEL,"MyEchoDestination");
destinationConfiguration.setMaximumQueueSize(200);
RejectedExecutionHandler rejectedExecutionHandler =
new DiscardOldestPolicy() {
@Override
public void rejectedExecution(
Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
if (_log.isWarnEnabled()) {
_log.warn(
"The current thread will handle the request " +
"because the audit router's task queue is at " +
"its maximum capacity");
}
super.rejectedExecution(runnable, threadPoolExecutor);
}
};
destinationConfiguration.setRejectedExecutionHandler(
rejectedExecutionHandler);
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
Dictionary<String, Object> destinationProperties =
new HashMapDictionary<>();
destinationProperties.put("destination.name", destination.getName());
_destinationServiceRegistration = _bundleContext.registerService(
Destination.class, destination, destinationProperties);
}
@Deactivate
protected void deactivate() {
if (_destinationServiceRegistration != null) {
Destination destination = _bundleContext.getService(
_destinationServiceRegistration.getReference());
_destinationServiceRegistration.unregister();
destination.destroy();
}
_bundleContext = null;
}
@Reference(unbind = "-")
protected void setMessageBus(MessageBus messageBus) {
}
private static final Log _log = LogFactoryUtil.getLog(
DemoReceiverConfigurator.class);
private volatile BundleContext _bundleContext;
@Reference
private DestinationFactory _destinationFactory;
private volatile ServiceRegistration<Destination>
_destinationServiceRegistration;
}
但是我的 Activate 方法没有被调用,我在我的消息侦听器中启用了 false class 并在我的配置器中启用了 = false,immediate = true class。
不知道我错过了什么。
经常在OSGi中,这个看似显而易见的配置就足够了。但在这种情况下,显然不是,因为 Liferay 现在知道您正在发送的消息以及您有兴趣接收的消息,但是 Messagebus 不知道要创建的目的地。
这似乎很明显 - 如果特定消息有侦听器,则可能需要有目的地。但它会是什么类型的呢?并行处理?有多少并行处理程序?同步?排队?这就是您需要做的。
虽然快速搜索没有找到有关如何执行此操作的文档,但您可以使用 this configurator 作为创建缺失的 link.
的示例
MessageBus 文档前几天改进了,看看下页https://dev.liferay.com/develop/tutorials/-/knowledge_base/7-0/message-bus
我想在 DXP 中使用 Liferay 消息总线。我写了下面的代码。
DemoSender.java
package demo.sender.portlet;
import demo.sender.constants.DemoSenderPortletKeys;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.portlet.bridges.mvc.MVCPortlet;
import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.Portlet;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
/**
* @author parth.ghiya
*/
@Component(
immediate = true,
property = {
"com.liferay.portlet.display-category=category.sample",
"com.liferay.portlet.instanceable=true",
"javax.portlet.display-name=demo-sender Portlet",
"javax.portlet.init-param.template-path=/",
"javax.portlet.init-param.view-template=/view.jsp",
"javax.portlet.name=" + DemoSenderPortletKeys.DemoSender,
"javax.portlet.resource-bundle=content.Language",
"javax.portlet.security-role-ref=power-user,user"
},
service = Portlet.class
)
public class DemoSenderPortlet extends MVCPortlet {
@Activate
protected void activate(BundleContext bundleContext) {
_bundleContext = bundleContext;
}
public void sendMessage(
ActionRequest actionRequest, ActionResponse actionResponse) {
if (_log.isInfoEnabled()) {
_log.info("Sending message to DE Echo service");
}
Message message = new Message();
message.setDestinationName("MyEchoDestination");
message.setPayload("Hello World!");
message.setResponseDestinationName("MyEchoResponse");
_messageBus.sendMessage(message.getDestinationName(), message);
}
private static final Log _log = LogFactoryUtil.getLog(DemoSenderPortlet.class);
private BundleContext _bundleContext;
@Reference
private MessageBus _messageBus;
}
DemoReceiver.java
package demo.receiver.portlet;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageListener;
@Component(
immediate = true, property = {"destination.name=MyEchoDestination"},
service = MessageListener.class
)
public class DemoReceiverPortlet extends BaseMessageListener {
@Override
protected void doReceive(Message message) throws Exception {
if (_log.isInfoEnabled()) {
_log.info("Received: " + message);
}
String payload = (String)message.getPayload();
if (_log.isInfoEnabled()) {
_log.info("Message payload: " + payload);
}
/*
String responseDestinationName = message.getResponseDestinationName();
if ((responseDestinationName != null) &&
(responseDestinationName.length() > 0)) {
Message responseMessage = new Message();
responseMessage.setDestinationName(responseDestinationName);
responseMessage.setResponseId(message.getResponseId());
//This is just for demo purposes
responseMessage.setPayload(payload);
_messageBus.sendMessage(
message.getResponseDestinationName(), responseMessage);
}
*/
}
private static final Log _log = LogFactoryUtil.getLog(DemoReceiverPortlet.class);
@Reference
private volatile MessageBus _messageBus;
}
问题是我的 doReceive 方法从未被调用过。 需要进一步添加什么配置?
此致
P.S : 在 DemoSender 中,我在点击按钮时发送了一些消息
编辑#1
我确实添加了如下配置器代码。
package demo.receiver.portlet;
import java.util.Dictionary;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import com.liferay.portal.kernel.concurrent.DiscardOldestPolicy;
import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.util.HashMapDictionary;
@Component(
enabled = false, immediate = true,
service = DemoReceiverConfigurator.class
)
public class DemoReceiverConfigurator {
@Activate
protected void activate(ComponentContext componentContext) {
_bundleContext = componentContext.getBundleContext();
System.out.println("===demo===");
Dictionary<String, Object> properties =
componentContext.getProperties();
DestinationConfiguration destinationConfiguration =
new DestinationConfiguration(DestinationConfiguration.DESTINATION_TYPE_PARALLEL,"MyEchoDestination");
destinationConfiguration.setMaximumQueueSize(200);
RejectedExecutionHandler rejectedExecutionHandler =
new DiscardOldestPolicy() {
@Override
public void rejectedExecution(
Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
if (_log.isWarnEnabled()) {
_log.warn(
"The current thread will handle the request " +
"because the audit router's task queue is at " +
"its maximum capacity");
}
super.rejectedExecution(runnable, threadPoolExecutor);
}
};
destinationConfiguration.setRejectedExecutionHandler(
rejectedExecutionHandler);
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
Dictionary<String, Object> destinationProperties =
new HashMapDictionary<>();
destinationProperties.put("destination.name", destination.getName());
_destinationServiceRegistration = _bundleContext.registerService(
Destination.class, destination, destinationProperties);
}
@Deactivate
protected void deactivate() {
if (_destinationServiceRegistration != null) {
Destination destination = _bundleContext.getService(
_destinationServiceRegistration.getReference());
_destinationServiceRegistration.unregister();
destination.destroy();
}
_bundleContext = null;
}
@Reference(unbind = "-")
protected void setMessageBus(MessageBus messageBus) {
}
private static final Log _log = LogFactoryUtil.getLog(
DemoReceiverConfigurator.class);
private volatile BundleContext _bundleContext;
@Reference
private DestinationFactory _destinationFactory;
private volatile ServiceRegistration<Destination>
_destinationServiceRegistration;
}
但是我的 Activate 方法没有被调用,我在我的消息侦听器中启用了 false class 并在我的配置器中启用了 = false,immediate = true class。
不知道我错过了什么。
经常在OSGi中,这个看似显而易见的配置就足够了。但在这种情况下,显然不是,因为 Liferay 现在知道您正在发送的消息以及您有兴趣接收的消息,但是 Messagebus 不知道要创建的目的地。
这似乎很明显 - 如果特定消息有侦听器,则可能需要有目的地。但它会是什么类型的呢?并行处理?有多少并行处理程序?同步?排队?这就是您需要做的。
虽然快速搜索没有找到有关如何执行此操作的文档,但您可以使用 this configurator 作为创建缺失的 link.
的示例MessageBus 文档前几天改进了,看看下页https://dev.liferay.com/develop/tutorials/-/knowledge_base/7-0/message-bus