来自服务器通道的端客户端的消息泛滥和 CometD 框架工作的错误消息

Flooding of message at side client from server channel and wrong message with CometD frame work

我正在开发一个客户端-服务器应用程序,我希望在客户端-服务器之间建立持久连接,为此我选择了 CometD 框架。 我成功创建了 CometD 应用程序。

客户-

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;

import com.synacor.idm.auth.LdapAuthenticator;
import com.synacor.idm.resources.LdapResource;

public class CometDClient {
    private volatile BayeuxClient client;
    private final AuthListner authListner = new AuthListner();
    private LdapResource ldapResource;
public static void main(String[] args) throws Exception {

    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.LEVEL", "ERROR");
    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.util.log.announce", "false");
    org.eclipse.jetty.util.log.Log.getRootLogger().setDebugEnabled(false);
    CometDClient client = new CometDClient();
client.run();
}

public void run()  {
    String url = "http://localhost:1010/cometd";
    HttpClient httpClient = new HttpClient();

    try {
        httpClient.start();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    
    client = new BayeuxClient(url, new LongPollingTransport(null, httpClient));
    client.getChannel(Channel.META_HANDSHAKE).addListener(new InitializerListener());
    client.getChannel(Channel.META_CONNECT).addListener(new ConnectionListener());
    client.getChannel("/ldapAuth").addListener(new AuthListner());
    
    
    
    client.handshake();
    boolean success = client.waitFor(1000, BayeuxClient.State.CONNECTED);
    if (!success) {
        System.err.printf("Could not handshake with server at %s%n", url);
        return;
    }

}

private void initialize() {
    client.batch(() -> {

        
        ClientSessionChannel authChannel = client.getChannel("/ldapAuth");
        authChannel.subscribe(authListner);

    });
}

private class InitializerListener implements ClientSessionChannel.MessageListener {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (message.isSuccessful()) {
            initialize();
        }
    }
}

private class ConnectionListener implements ClientSessionChannel.MessageListener {
    private boolean wasConnected;
    private boolean connected;

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (client.isDisconnected()) {
            connected = false;
            connectionClosed();
            return;
        }

        wasConnected = connected;
        connected = message.isSuccessful();
        if (!wasConnected && connected) {
            connectionEstablished();
        } else if (wasConnected && !connected) {
            connectionBroken();
        }
    }
}
private void connectionEstablished() {
    System.err.printf("system: Connection to Server Opened%n");
}

private void connectionClosed() {
    System.err.printf("system: Connection to Server Closed%n");
}

private void connectionBroken() {
    System.err.printf("system: Connection to Server Broken%n");
}


private class AuthListner implements ClientSessionChannel.MessageListener{

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        Object data2 = message.getData();
        System.err.println("Authentication String     " + data2 );
        if(data2 != null && data2.toString().indexOf("=")>0) {
        String[] split = data2.toString().split(",");
        String userString = split[0];
        String passString = split[1];
        String[] splitUser = userString.split("=");
        String[] splitPass = passString.split("=");
        LdapAuthenticator authenticator = new LdapAuthenticator(ldapResource);
        if(authenticator.authenticateToLdap(splitUser[1], splitPass[1])) {
//          client.getChannel("/ldapAuth").publish("200:success from client "+user);
//          channel.publish("200:Success "+user);
            Map<String, Object> data = new HashMap<>();
            // Fill in the structure, for example:
            data.put(splitUser[1], "Authenticated");
            channel.publish(data, publishReply -> {
                if (publishReply.isSuccessful()) {
                    System.out.print("message sent successfully on server");
                }
            });
        }
        }
        
    }
    
}

}

服务器 - 服务 Class

import java.util.List;
import java.util.concurrent.BlockingQueue;

import org.cometd.bayeux.MarkedReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ConfigurableServerChannel;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.ServerMessageImpl;

import com.synacor.idm.resources.AuthenticationResource;
import com.synacor.idm.resources.AuthenticationResource.AuthC;


public class AuthenticationService extends AbstractService implements AuthenticationResource.Listener {

    String authParam;
    BayeuxServer bayeux;
    BlockingQueue<String> sharedResponseQueue;
    public AuthenticationService(BayeuxServer bayeux) {

        super(bayeux, "ldapagentauth");
        addService("/ldapAuth", "ldapAuthentication");  
        this.bayeux = bayeux;
    }
    public void ldapAuthentication(ServerSession session, ServerMessage message) {
        System.err.println("********* inside auth service ***********");
        Object data = message.getData();
        System.err.println("****** got data back from client " +data.toString());
        sharedResponseQueue.add(data.toString());
    }
    @Override
    public void onUpdates(List<AuthC> updates) {
        System.err.println("********* inside auth service listner ***********");

        MarkedReference<ServerChannel> createChannelIfAbsent = bayeux.createChannelIfAbsent("/ldapAuth", new ConfigurableServerChannel.Initializer() {
            public void configureChannel(ConfigurableServerChannel channel)
            {
                channel.setPersistent(true);
                channel.setLazy(true);
            }
        });
        ServerChannel reference = createChannelIfAbsent.getReference();
        for (AuthC authC : updates) {

            authParam = authC.getAuthStr();
            this.sharedResponseQueue= authC.getsharedResponseQueue();
            ServerChannel channel = bayeux.getChannel("/ldapAuth");
            ServerMessageImpl serverMessageImpl = new ServerMessageImpl();
            serverMessageImpl.setData(authParam);

            reference.setBroadcastToPublisher(false);
            reference.publish(getServerSession(), authParam, Promise.noop());
        }

    }


}

事件触发器class-

public class AuthenticationResource implements Runnable{
      private final JerseyClientBuilder clientBuilder;
      private final BlockingQueue<String> sharedQueue; 
      private final BlockingQueue<String> sharedResponseQueue;
      private boolean isAuthCall = false; 
      private String userAuth;
        private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
        Thread runner;

    public AuthenticationResource(JerseyClientBuilder clientBuilder,BlockingQueue<String> sharedQueue,BlockingQueue<String> sharedResponseQueue) {
        super();
        this.clientBuilder = clientBuilder;
        this.sharedQueue = sharedQueue;
        this.sharedResponseQueue= sharedResponseQueue;
          this.runner = new Thread(this);
            this.runner.start();
    }
  public List<Listener> getListeners()
  {
      return listeners;
  }
  

    @Override
    public void run() {
      List<AuthC> updates = new ArrayList<AuthC>();

//      boolean is =  true;
      while(true){
        if(sharedQueue.size()<=0) {
            continue;
        }
          try {
             userAuth  = sharedQueue.take();
             // Notify the listeners
             for (Listener listener : listeners)
               
             {
               updates.add(new AuthC(userAuth,sharedResponseQueue));
                 listener.onUpdates(updates);
             }
             updates.add(new AuthC(userAuth,sharedResponseQueue));
                  System.out.println("****** Auth consume ******** " +  userAuth);

             if(userAuth != null) {
               isAuthCall = true;
             }

          } catch (Exception err) {
             err.printStackTrace();
          break;
          }
//          if (sharedQueue.size()>0) {
//              is = false;
//          }
          
      } 

    }
    
  public static class AuthC
  {
      private final String authStr;
      private final BlockingQueue<String> sharedResponseQueue;

      public AuthC(String authStr,BlockingQueue<String> sharedResponseQueue)
      {
          this.authStr = authStr;
          this.sharedResponseQueue=sharedResponseQueue;

      }


      public String getAuthStr()
      {
          return authStr;
      }

      public BlockingQueue<String> getsharedResponseQueue()
      {
          return sharedResponseQueue;
      }

  }
    
  public interface Listener extends EventListener
  {
      void onUpdates(List<AuthC> updates);
  }

}

我已经成功建立了客户端和服务器之间的连接。 问题 -

1- 当我从服务器向客户端发送消息时,同一条消息被发送了多次。我只期待一种请求-响应机制。 在我的例子中——服务器正在发送用户凭据我期待结果,无论用户是否通过身份验证。

你可以在图像中看到它是如何在客户端用相同的字符串泛滥的 -

2- 客户端和服务器之间的消息循环存在其他问题,我可以通过添加来解决,但消息循环仍有一段时间。

serverChannel.setBroadcastToPublisher(false);

3- 如果我更改服务器上的授权字符串,在客户端它似乎是旧的。 例如-

这就是这三个问题,请指导我,帮助我解决。

CometD 提供 request/response 使用远程调用的消息传递方式,两者都在 client and on the server (you want to use annotated services 服务器上)。

频道 /ldapAuth 有 2 个订阅者:远程客户端(订阅 authChannel.subscribe(...))和服务器端 AuthenticationService(订阅 addService("/ldapAuth", "ldapAuthentication"))。

因此,每次从 AuthenticationService.onUpdates(...) 发布到该频道时,您都会发布到远程客户端,然后返回到 AuthenticationService,这就是调用 setBroadcastToPublisher(false) 有帮助的原因。

对于身份验证消息,最好坚持使用远程调用,因为它们具有自然的 request/response 语义,而不是广播语义。 请阅读 how applications should interact with CometD

关于其他循环,没有CometD触发的循环。 您的应用程序中有循环(在 AuthenticationService.onUpdates(...) 中)并且您从一个队列中获取多次可能具有相同信息的队列(在 AuthenticationResource.run() 中——顺便说一句,它是一个自旋循环,可能会自旋一个CPU 核心利用率达到 100% -- 你应该解决这个问题)。

您看到陈旧数据这一事实可能不是 CometD 的问题,因为 CometD 不会在任何地方存储消息,因此它无法构成用户特定的数据。

我建议您使用远程调用和注释服务清理您的代码。 此外,从自旋循环中清理您自己的代码。

如果按照上面的建议还是有问题,请仔细检查应用程序错误,这不太可能是 CometD 的问题。