Cometd 发布输入数据 Map 而不是等待服务器输出

Cometd publishes input data Map instead of waiting for Output from server

我正在开发一个 Spring-MVC 应用程序,它使用 Cometd 进行聊天。我面临的唯一问题是当我从客户端发送消息时,客户端直接将该消息附加到服务器,而不是等待我在后端调用发布机制然后附加那些东西。我在连接的 2 台不同计算机上进行了尝试,结果相同。因此,我无法访问我在后端设置的变量。

请告诉我我能做什么。非常感谢。

调试日志:

DEBUG: org.cometd.server.BayeuxServerImpl.31913ea0 - <  {data={name=check123}, channel=/chat/1360}
wassup
DEBUG: org.cometd.server.BayeuxServerImpl.411c4c13 - Added channel /chat/1360
DEBUG: org.cometd.server.BayeuxServerImpl.411c4c13 - <  {data={accountid=1360, firstname=AKS, name=check123, channelname=/chat/1360, timestamp=2015-04-27 10:58:08.539}, channel=/chat/1360}
DEBUG: org.cometd.server.BayeuxServerImpl.31913ea0 - << {channel=/chat/1360, id=10, successful=true}
DEBUG: org.cometd.server.BayeuxServerImpl.31913ea0 - <  {channel=/chat/1360, id=10, successful=true}

在上面的调试日志中,第一行来自客户端,语句'wassup' 打印在我的@Listener 方法的开头。接下来,我添加了一些数据并将其发布,但我无法访问第三行提到的任何变量。任何想法。

这是Java代码:

@Named
@Singleton
@Service("chat")
public class ChatServiceImpl{

    @Inject
    private BayeuxServer bayeux;

    @Inject
    private PersonService personService;

    @Inject
    private ChatMessagesService chatService;

    @Inject
    private ConversationService conversationService;

    @Inject
    private RepliesService repliesService;

    @Session
    private ServerSession serverSession;

    @PostConstruct
    public void init(){
        System.out.println("Echo Service Initialized");
    }

    @PreDestroy
    public void cleanUp() throws Exception {
        System.out.println("Spring Container is destroyed");
    }


    @Listener(value = "/person/*")
    public void privateChat(ServerSession remote, ServerMessage.Mutable message){

        System.out.println("wassup");
        Person sender = this.personService.getCurrentlyAuthenticatedUser();
        String senderName = sender.getFirstName();

        Map<String,Object> input = message.getDataAsMap();
        String data = (String) input.get("name");
        String temp = message.getChannel();
        String temp1 = temp;
        temp = temp.replace("/person/","");
        final int conversationId = Integer.valueOf(temp);
        Map<String,Object> output = new HashMap<>();
        output.put("text",data);
        output.put("sender",senderName);
        output.put("channelname",temp);
        output.put("timestamp",new Timestamp(System.currentTimeMillis()));
        bayeux.createChannelIfAbsent(message.getChannel(), channel -> channel.setPersistent(true));
        ServerChannel serverChannel = bayeux.getChannel(message.getChannel());
        serverChannel.publish(remote,output);
        Thread thread = new Thread(() ->{
            Replies replies = new Replies();
            replies.setReplyingPersonName(senderName);
            replies.setReplyText(data);
            this.repliesService.addReply(replies,conversationId, sender);
        });
        thread.start();
    }


    @Listener("/chat/*")
    public void processChat(ServerSession remote, ServerMessage.Mutable message){
        System.out.println("wassup");
        String firstName = this.personService.returnLoggedInUsersName();
        Timestamp timestamp = new Timestamp(System.currentTimeMillis());

        Map<String, Object> input = message.getDataAsMap();
        String data = (String)input.get("name");
        String temp = message.getChannel();
        String temp1 = temp;
        temp = temp.replace("/chat/","");
        final Long groupAccountIdentifier = Long.valueOf(temp);

        Map<String, Object> output = new HashMap<>();
        output.put("name",data);
        output.put("channelname",message.getChannel());
        output.put("firstname",firstName);
        output.put("timestamp",timestamp);
        output.put("accountid",groupAccountIdentifier);

        bayeux.createChannelIfAbsent(message.getChannel(), channel -> channel.setPersistent(true));
        ServerChannel serverChannel = bayeux.getChannel(message.getChannel());
        serverChannel.publish(serverSession,output);

        Thread thread = new Thread(() ->{
            ChatMessages chatMessages = new ChatMessages();
            chatMessages.setFirstName(firstName);
            chatMessages.setChatText(data);
            chatMessages.setChannelName(message.getChannel());
            chatMessages.setTimeStamp(new Timestamp((System.currentTimeMillis())));
            this.chatService.addChatMessage(chatMessages,groupAccountIdentifier);
        });
        thread.start();
    }
}

Java脚本代码:

(function($)
{
    var cometd = $.cometd;

    $(document).ready(function()
    {
        function _connectionEstablished()
        {
            $('#body').append('<div>CometD Connection Established</div>');
        }

        function _connectionBroken()
        {
            $('#body').append('<div>CometD Connection Broken</div>');
        }

        function _connectionClosed()
        {
            $('#body').append('<div>CometD Connection Closed</div>');
        }

        // Function that manages the connection status with the Bayeux server
        var _connected = false;
        function _metaConnect(message)
        {
            if (cometd.isDisconnected())
            {
                _connected = false;
                _connectionClosed();
                return;
            }

            var wasConnected = _connected;
            _connected = message.successful === true;
            if (!wasConnected && _connected)
            {
                _connectionEstablished();
            }
            else if (wasConnected && !_connected)
            {
                _connectionBroken();
            }
        }

        // Function invoked when first contacting the server and
        // when the server has lost the state of this client
        function _metaHandshake(handshake)
        {
            if (handshake.successful === true)
            {
                cometd.batch(function()
                {
                    cometd.subscribe('/chat/1360', function(message)
                    {
                        $('#hello1').append('<div>Server Says: ' + message.data.name + ' ' + ' ' +message.data.firstname+'</div>');
                    });

                });
                cometd.publish('/chat/1360');
            }
        }

        // Disconnect when the page unloads
        $(window).unload(function()
        {
            cometd.disconnect(true);
        });

        var cometURL = location.protocol + "//" + location.host + config.contextPath + "/cometd";

        cometd.configure({
            url: cometURL,
            logLevel: 'debug'
        });

        cometd.addListener('/meta/handshake', _metaHandshake);
        cometd.addListener('/meta/connect', _metaConnect);
        cometd.handshake();
    });
})(jQuery);

如果有人有任何想法,请告诉我我能做些什么。如果需要更多信息,请随时询问。非常感谢。 :-)

更新

按照Sborder第二次的建议,我做了一些修改,但已经部分成功了。我也忘了添加我的 index.jsp,它正在发送实际的短信。

首先,ChatServiceImpl :

     @Session
        private ServerSession serverSession;

  @Listener("/service/chat/{accountid}")
    public void processChat(ServerSession remote, ServerMessage.Mutable message,@Param("accountid")String accountid) {
        System.out.println("wassup and account id is "+accountid);
        String firstName = this.personService.returnLoggedInUsersName();
        Timestamp timestamp = new Timestamp(System.currentTimeMillis());

        Map<String, Object> input = message.getDataAsMap();
        String text = (String) input.get("name");
        String temp = message.getChannel();
 Map<String, Object> data = new HashMap<String,Object>();
        data.put("name", text);
        data.put("channelname", message.getChannel());
        data.put("firstname", firstName);
        data.put("timestamp", timestamp);
     ServerChannel serverChannel = bayeux.createChannelIfAbsent("/chat/1306").getReference();
        serverChannel.setPersistent(true);
        System.out.println("Channel name is "+serverChannel.getChannelId());

       serverChannel.publish(remote, data);
    }

Application.js :

   function _metaHandshake(handshake)
    {
        if (handshake.successful === true)
        {
            cometd.batch(function()
            {
                cometd.subscribe('/chat/1306', function(message){
                    $('.hello1').append('<div>Server Says: ' +message.data.name + ' ' + ' ' +message.data.firstname+'</div>');

                });

            });

        }
    }

index.jsp :

 <div id="body">
    <input id="enterText" type="text" />Enter text
    <input id="sendMessage" type="button"/>
</div>

<div class="hello1">

</div>
<div class="hello123">

</div>

<script type="text/javascript">
    var config = {
        contextPath: '${pageContext.request.contextPath}'
    };
    var cometd = $.cometd;
    $(document).on("click", "#sendMessage", function(){
        var text = $("#enterText").val();
        cometd.publish('/service/chat/1306', { name: text});
    });

所以,如果我在 /service 频道上使用 serverChannel.publish,那么在前端,没有文本附加到服务器。如果我使用 remote.deliver 而不是发布,则会附加正确的文本,但仅附加到当前浏览器中的客户端,而不附加到其他浏览器中的其他客户端。我如何使用 serverChannel.publish 向所有订阅者发送数据,我的意思是正确的数据。

您的 JavaScript 客户端代码以错误的方式发布,只是调用:

cometd.publish('/chat/1360');

缺少您要发送的数据,您至少应该使用一个空对象,如:

cometd.publish('/chat/1360', {});

请注意,由于您的 JavaScript 客户端(发件人)也订阅了频道 /chat/1360,发件人在该频道上发布的任何消息都会 return 返回给发件人本身。这是CometD的default behaviour

最重要的是,在服务器端,您也通过 ServerChannel.publish(...) 在该频道上发布,因此您正在向该频道的订阅者发送 另一条 消息.

不需要你打电话:

bayeux.createChannelIfAbsent(message.getChannel(), channel -> channel.setPersistent(true));

因为此时频道已经存在;你可以打电话给 bayeux.getChannel(message.getChannel()).

你正在做的是向服务器发送一条消息 (messageA),然后你想处理服务器上的 messageA 字段并生成一条新的修改后的消息 (messageB) 广播给所有订阅者,包括原始发送者。

在这种情况下,您最好在 service channel 上发送 messageA,这样它就不会广播给所有订阅者。 MessageA 并非旨在广播,它只是一种向服务器传达您想要执行的操作的方式,服务渠道正是为此目的而存在。

先把你的信息弄好,你会发现其余的一切都会顺利进行。

您可以使用 remote calls 而不是服务渠道,在您想要对发送的消息执行一些服务器端处理的这种特殊情况下,它们使用起来更简单。

最后,请查看 channel parameters,而不是自己进行解析,使用侦听器表示法:

@Listener("/chat/{accountId}")
public void processChat(ServerSession remote, ServerMessage.Mutable message, @Param("accountId") String account)
{
    ...
}