如何在 Liferay 7 中配置消息总线?

How to configure Message Bus In Liferay 7?

我想在 DXP 中使用 Liferay 消息总线。我写了下面的代码。

DemoSender.java

package demo.sender.portlet;

import demo.sender.constants.DemoSenderPortletKeys;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.portlet.bridges.mvc.MVCPortlet;

import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.Portlet;

import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

/**
 * @author parth.ghiya
 */
@Component(
    immediate = true,
    property = {
        "com.liferay.portlet.display-category=category.sample",
        "com.liferay.portlet.instanceable=true",
        "javax.portlet.display-name=demo-sender Portlet",
        "javax.portlet.init-param.template-path=/",
        "javax.portlet.init-param.view-template=/view.jsp",
        "javax.portlet.name=" + DemoSenderPortletKeys.DemoSender,
        "javax.portlet.resource-bundle=content.Language",
        "javax.portlet.security-role-ref=power-user,user"
    },
    service = Portlet.class
)
public class DemoSenderPortlet extends MVCPortlet {

    @Activate
    protected void activate(BundleContext bundleContext) {
        _bundleContext = bundleContext;

    }



    public void sendMessage(
            ActionRequest actionRequest, ActionResponse actionResponse) {
            if (_log.isInfoEnabled()) {
                _log.info("Sending message to DE Echo service");
            }
            Message message = new Message();
            message.setDestinationName("MyEchoDestination");
            message.setPayload("Hello World!");
            message.setResponseDestinationName("MyEchoResponse");

            _messageBus.sendMessage(message.getDestinationName(), message);

        }

        private static final Log _log = LogFactoryUtil.getLog(DemoSenderPortlet.class);

        private BundleContext _bundleContext;

        @Reference
        private MessageBus _messageBus;
}

DemoReceiver.java

package demo.receiver.portlet;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageListener;

@Component(
    immediate = true, property = {"destination.name=MyEchoDestination"},
    service = MessageListener.class
)
public class DemoReceiverPortlet extends BaseMessageListener {

    @Override
    protected void doReceive(Message message) throws Exception {
        if (_log.isInfoEnabled()) {
            _log.info("Received: " + message);
        }

        String payload = (String)message.getPayload();

        if (_log.isInfoEnabled()) {
            _log.info("Message payload: " + payload);
        }
/*
        String responseDestinationName = message.getResponseDestinationName();

        if ((responseDestinationName != null) &&
            (responseDestinationName.length() > 0)) {

            Message responseMessage = new Message();

            responseMessage.setDestinationName(responseDestinationName);
            responseMessage.setResponseId(message.getResponseId());

            //This is just for demo purposes

            responseMessage.setPayload(payload);

            _messageBus.sendMessage(
                message.getResponseDestinationName(), responseMessage);
        }
  */
    }

    private static final Log _log = LogFactoryUtil.getLog(DemoReceiverPortlet.class);

    @Reference
    private volatile MessageBus _messageBus;
}

问题是我的 doReceive 方法从未被调用过。 需要进一步添加什么配置?

此致

P.S : 在 DemoSender 中,我在点击按钮时发送了一些消息

编辑#1

我确实添加了如下配置器代码。

package demo.receiver.portlet;

import java.util.Dictionary;

import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.concurrent.DiscardOldestPolicy;
import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.util.HashMapDictionary;

@Component(
        enabled = false, immediate = true,
        service = DemoReceiverConfigurator.class
    )

public class DemoReceiverConfigurator {

    @Activate
    protected void activate(ComponentContext componentContext) {
        _bundleContext = componentContext.getBundleContext();
        System.out.println("===demo===");
        Dictionary<String, Object> properties =
            componentContext.getProperties();



        DestinationConfiguration destinationConfiguration =
            new DestinationConfiguration(DestinationConfiguration.DESTINATION_TYPE_PARALLEL,"MyEchoDestination");

        destinationConfiguration.setMaximumQueueSize(200);

        RejectedExecutionHandler rejectedExecutionHandler =
            new DiscardOldestPolicy() {

                @Override
                public void rejectedExecution(
                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

                    if (_log.isWarnEnabled()) {
                        _log.warn(
                            "The current thread will handle the request " +
                                "because the audit router's task queue is at " +
                                    "its maximum capacity");
                    }

                    super.rejectedExecution(runnable, threadPoolExecutor);
                }

            };

        destinationConfiguration.setRejectedExecutionHandler(
            rejectedExecutionHandler);

        Destination destination = _destinationFactory.createDestination(
            destinationConfiguration);

        Dictionary<String, Object> destinationProperties =
            new HashMapDictionary<>();

        destinationProperties.put("destination.name", destination.getName());

        _destinationServiceRegistration = _bundleContext.registerService(
            Destination.class, destination, destinationProperties);
    }

    @Deactivate
    protected void deactivate() {
        if (_destinationServiceRegistration != null) {
            Destination destination = _bundleContext.getService(
                _destinationServiceRegistration.getReference());

            _destinationServiceRegistration.unregister();

            destination.destroy();
        }

        _bundleContext = null;
    }

    @Reference(unbind = "-")
    protected void setMessageBus(MessageBus messageBus) {
    }

    private static final Log _log = LogFactoryUtil.getLog(
        DemoReceiverConfigurator.class);

    private volatile BundleContext _bundleContext;

    @Reference
    private DestinationFactory _destinationFactory;

    private volatile ServiceRegistration<Destination>
        _destinationServiceRegistration;

}

但是我的 Activate 方法没有被调用,我在我的消息侦听器中启用了 false class 并在我的配置器中启用了 = false,immediate = true class。

不知道我错过了什么。

经常在OSGi中,这个看似显而易见的配置就足够了。但在这种情况下,显然不是,因为 Liferay 现在知道您正在发送的消息以及您有兴趣接收的消息,但是 Messagebus 不知道要创建的目的地。

这似乎很明显 - 如果特定消息有侦听器,则可能需要有目的地。但它会是什么类型的呢?并行处理?有多少并行处理程序?同步?排队?这就是您需要做的。

虽然快速搜索没有找到有关如何执行此操作的文档,但您可以使用 this configurator 作为创建缺失的 link.

的示例

MessageBus 文档前几天改进了,看看下页https://dev.liferay.com/develop/tutorials/-/knowledge_base/7-0/message-bus