来自服务器通道的端客户端的消息泛滥和 CometD 框架工作的错误消息
Flooding of message at side client from server channel and wrong message with CometD frame work
我正在开发一个客户端-服务器应用程序,我希望在客户端-服务器之间建立持久连接,为此我选择了 CometD 框架。
我成功创建了 CometD 应用程序。
客户-
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import com.synacor.idm.auth.LdapAuthenticator;
import com.synacor.idm.resources.LdapResource;
public class CometDClient {
private volatile BayeuxClient client;
private final AuthListner authListner = new AuthListner();
private LdapResource ldapResource;
public static void main(String[] args) throws Exception {
org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.LEVEL", "ERROR");
org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.util.log.announce", "false");
org.eclipse.jetty.util.log.Log.getRootLogger().setDebugEnabled(false);
CometDClient client = new CometDClient();
client.run();
}
public void run() {
String url = "http://localhost:1010/cometd";
HttpClient httpClient = new HttpClient();
try {
httpClient.start();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
client = new BayeuxClient(url, new LongPollingTransport(null, httpClient));
client.getChannel(Channel.META_HANDSHAKE).addListener(new InitializerListener());
client.getChannel(Channel.META_CONNECT).addListener(new ConnectionListener());
client.getChannel("/ldapAuth").addListener(new AuthListner());
client.handshake();
boolean success = client.waitFor(1000, BayeuxClient.State.CONNECTED);
if (!success) {
System.err.printf("Could not handshake with server at %s%n", url);
return;
}
}
private void initialize() {
client.batch(() -> {
ClientSessionChannel authChannel = client.getChannel("/ldapAuth");
authChannel.subscribe(authListner);
});
}
private class InitializerListener implements ClientSessionChannel.MessageListener {
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
if (message.isSuccessful()) {
initialize();
}
}
}
private class ConnectionListener implements ClientSessionChannel.MessageListener {
private boolean wasConnected;
private boolean connected;
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
if (client.isDisconnected()) {
connected = false;
connectionClosed();
return;
}
wasConnected = connected;
connected = message.isSuccessful();
if (!wasConnected && connected) {
connectionEstablished();
} else if (wasConnected && !connected) {
connectionBroken();
}
}
}
private void connectionEstablished() {
System.err.printf("system: Connection to Server Opened%n");
}
private void connectionClosed() {
System.err.printf("system: Connection to Server Closed%n");
}
private void connectionBroken() {
System.err.printf("system: Connection to Server Broken%n");
}
private class AuthListner implements ClientSessionChannel.MessageListener{
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
Object data2 = message.getData();
System.err.println("Authentication String " + data2 );
if(data2 != null && data2.toString().indexOf("=")>0) {
String[] split = data2.toString().split(",");
String userString = split[0];
String passString = split[1];
String[] splitUser = userString.split("=");
String[] splitPass = passString.split("=");
LdapAuthenticator authenticator = new LdapAuthenticator(ldapResource);
if(authenticator.authenticateToLdap(splitUser[1], splitPass[1])) {
// client.getChannel("/ldapAuth").publish("200:success from client "+user);
// channel.publish("200:Success "+user);
Map<String, Object> data = new HashMap<>();
// Fill in the structure, for example:
data.put(splitUser[1], "Authenticated");
channel.publish(data, publishReply -> {
if (publishReply.isSuccessful()) {
System.out.print("message sent successfully on server");
}
});
}
}
}
}
}
服务器 - 服务 Class
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.cometd.bayeux.MarkedReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ConfigurableServerChannel;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.ServerMessageImpl;
import com.synacor.idm.resources.AuthenticationResource;
import com.synacor.idm.resources.AuthenticationResource.AuthC;
public class AuthenticationService extends AbstractService implements AuthenticationResource.Listener {
String authParam;
BayeuxServer bayeux;
BlockingQueue<String> sharedResponseQueue;
public AuthenticationService(BayeuxServer bayeux) {
super(bayeux, "ldapagentauth");
addService("/ldapAuth", "ldapAuthentication");
this.bayeux = bayeux;
}
public void ldapAuthentication(ServerSession session, ServerMessage message) {
System.err.println("********* inside auth service ***********");
Object data = message.getData();
System.err.println("****** got data back from client " +data.toString());
sharedResponseQueue.add(data.toString());
}
@Override
public void onUpdates(List<AuthC> updates) {
System.err.println("********* inside auth service listner ***********");
MarkedReference<ServerChannel> createChannelIfAbsent = bayeux.createChannelIfAbsent("/ldapAuth", new ConfigurableServerChannel.Initializer() {
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
channel.setLazy(true);
}
});
ServerChannel reference = createChannelIfAbsent.getReference();
for (AuthC authC : updates) {
authParam = authC.getAuthStr();
this.sharedResponseQueue= authC.getsharedResponseQueue();
ServerChannel channel = bayeux.getChannel("/ldapAuth");
ServerMessageImpl serverMessageImpl = new ServerMessageImpl();
serverMessageImpl.setData(authParam);
reference.setBroadcastToPublisher(false);
reference.publish(getServerSession(), authParam, Promise.noop());
}
}
}
事件触发器class-
public class AuthenticationResource implements Runnable{
private final JerseyClientBuilder clientBuilder;
private final BlockingQueue<String> sharedQueue;
private final BlockingQueue<String> sharedResponseQueue;
private boolean isAuthCall = false;
private String userAuth;
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
Thread runner;
public AuthenticationResource(JerseyClientBuilder clientBuilder,BlockingQueue<String> sharedQueue,BlockingQueue<String> sharedResponseQueue) {
super();
this.clientBuilder = clientBuilder;
this.sharedQueue = sharedQueue;
this.sharedResponseQueue= sharedResponseQueue;
this.runner = new Thread(this);
this.runner.start();
}
public List<Listener> getListeners()
{
return listeners;
}
@Override
public void run() {
List<AuthC> updates = new ArrayList<AuthC>();
// boolean is = true;
while(true){
if(sharedQueue.size()<=0) {
continue;
}
try {
userAuth = sharedQueue.take();
// Notify the listeners
for (Listener listener : listeners)
{
updates.add(new AuthC(userAuth,sharedResponseQueue));
listener.onUpdates(updates);
}
updates.add(new AuthC(userAuth,sharedResponseQueue));
System.out.println("****** Auth consume ******** " + userAuth);
if(userAuth != null) {
isAuthCall = true;
}
} catch (Exception err) {
err.printStackTrace();
break;
}
// if (sharedQueue.size()>0) {
// is = false;
// }
}
}
public static class AuthC
{
private final String authStr;
private final BlockingQueue<String> sharedResponseQueue;
public AuthC(String authStr,BlockingQueue<String> sharedResponseQueue)
{
this.authStr = authStr;
this.sharedResponseQueue=sharedResponseQueue;
}
public String getAuthStr()
{
return authStr;
}
public BlockingQueue<String> getsharedResponseQueue()
{
return sharedResponseQueue;
}
}
public interface Listener extends EventListener
{
void onUpdates(List<AuthC> updates);
}
}
我已经成功建立了客户端和服务器之间的连接。
问题 -
1- 当我从服务器向客户端发送消息时,同一条消息被发送了多次。我只期待一种请求-响应机制。
在我的例子中——服务器正在发送用户凭据我期待结果,无论用户是否通过身份验证。
你可以在图像中看到它是如何在客户端用相同的字符串泛滥的 -
2- 客户端和服务器之间的消息循环存在其他问题,我可以通过添加来解决,但消息循环仍有一段时间。
serverChannel.setBroadcastToPublisher(false);
3- 如果我更改服务器上的授权字符串,在客户端它似乎是旧的。
例如-
- 1 个来自服务器的请求 - auth string -> user=foo,pass=bar -> at
客户端 - user=foo,pass=bar
- 2 来自服务器的请求 - auth string user=myuser,pass=mypass ->
在客户端 - user=foo,pass=bar
这就是这三个问题,请指导我,帮助我解决。
CometD 提供 request/response 使用远程调用的消息传递方式,两者都在 client and on the server (you want to use annotated services 服务器上)。
频道 /ldapAuth
有 2 个订阅者:远程客户端(订阅 authChannel.subscribe(...)
)和服务器端 AuthenticationService
(订阅 addService("/ldapAuth", "ldapAuthentication")
)。
因此,每次从 AuthenticationService.onUpdates(...)
发布到该频道时,您都会发布到远程客户端,然后返回到 AuthenticationService
,这就是调用 setBroadcastToPublisher(false)
有帮助的原因。
对于身份验证消息,最好坚持使用远程调用,因为它们具有自然的 request/response 语义,而不是广播语义。
请阅读 how applications should interact with CometD。
关于其他循环,没有CometD触发的循环。
您的应用程序中有循环(在 AuthenticationService.onUpdates(...)
中)并且您从一个队列中获取多次可能具有相同信息的队列(在 AuthenticationResource.run()
中——顺便说一句,它是一个自旋循环,可能会自旋一个CPU 核心利用率达到 100% -- 你应该解决这个问题)。
您看到陈旧数据这一事实可能不是 CometD 的问题,因为 CometD 不会在任何地方存储消息,因此它无法构成用户特定的数据。
我建议您使用远程调用和注释服务清理您的代码。
此外,从自旋循环中清理您自己的代码。
如果按照上面的建议还是有问题,请仔细检查应用程序错误,这不太可能是 CometD 的问题。
我正在开发一个客户端-服务器应用程序,我希望在客户端-服务器之间建立持久连接,为此我选择了 CometD 框架。 我成功创建了 CometD 应用程序。
客户-
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import com.synacor.idm.auth.LdapAuthenticator;
import com.synacor.idm.resources.LdapResource;
public class CometDClient {
private volatile BayeuxClient client;
private final AuthListner authListner = new AuthListner();
private LdapResource ldapResource;
public static void main(String[] args) throws Exception {
org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.LEVEL", "ERROR");
org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.util.log.announce", "false");
org.eclipse.jetty.util.log.Log.getRootLogger().setDebugEnabled(false);
CometDClient client = new CometDClient();
client.run();
}
public void run() {
String url = "http://localhost:1010/cometd";
HttpClient httpClient = new HttpClient();
try {
httpClient.start();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
client = new BayeuxClient(url, new LongPollingTransport(null, httpClient));
client.getChannel(Channel.META_HANDSHAKE).addListener(new InitializerListener());
client.getChannel(Channel.META_CONNECT).addListener(new ConnectionListener());
client.getChannel("/ldapAuth").addListener(new AuthListner());
client.handshake();
boolean success = client.waitFor(1000, BayeuxClient.State.CONNECTED);
if (!success) {
System.err.printf("Could not handshake with server at %s%n", url);
return;
}
}
private void initialize() {
client.batch(() -> {
ClientSessionChannel authChannel = client.getChannel("/ldapAuth");
authChannel.subscribe(authListner);
});
}
private class InitializerListener implements ClientSessionChannel.MessageListener {
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
if (message.isSuccessful()) {
initialize();
}
}
}
private class ConnectionListener implements ClientSessionChannel.MessageListener {
private boolean wasConnected;
private boolean connected;
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
if (client.isDisconnected()) {
connected = false;
connectionClosed();
return;
}
wasConnected = connected;
connected = message.isSuccessful();
if (!wasConnected && connected) {
connectionEstablished();
} else if (wasConnected && !connected) {
connectionBroken();
}
}
}
private void connectionEstablished() {
System.err.printf("system: Connection to Server Opened%n");
}
private void connectionClosed() {
System.err.printf("system: Connection to Server Closed%n");
}
private void connectionBroken() {
System.err.printf("system: Connection to Server Broken%n");
}
private class AuthListner implements ClientSessionChannel.MessageListener{
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
Object data2 = message.getData();
System.err.println("Authentication String " + data2 );
if(data2 != null && data2.toString().indexOf("=")>0) {
String[] split = data2.toString().split(",");
String userString = split[0];
String passString = split[1];
String[] splitUser = userString.split("=");
String[] splitPass = passString.split("=");
LdapAuthenticator authenticator = new LdapAuthenticator(ldapResource);
if(authenticator.authenticateToLdap(splitUser[1], splitPass[1])) {
// client.getChannel("/ldapAuth").publish("200:success from client "+user);
// channel.publish("200:Success "+user);
Map<String, Object> data = new HashMap<>();
// Fill in the structure, for example:
data.put(splitUser[1], "Authenticated");
channel.publish(data, publishReply -> {
if (publishReply.isSuccessful()) {
System.out.print("message sent successfully on server");
}
});
}
}
}
}
}
服务器 - 服务 Class
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.cometd.bayeux.MarkedReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ConfigurableServerChannel;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.ServerMessageImpl;
import com.synacor.idm.resources.AuthenticationResource;
import com.synacor.idm.resources.AuthenticationResource.AuthC;
public class AuthenticationService extends AbstractService implements AuthenticationResource.Listener {
String authParam;
BayeuxServer bayeux;
BlockingQueue<String> sharedResponseQueue;
public AuthenticationService(BayeuxServer bayeux) {
super(bayeux, "ldapagentauth");
addService("/ldapAuth", "ldapAuthentication");
this.bayeux = bayeux;
}
public void ldapAuthentication(ServerSession session, ServerMessage message) {
System.err.println("********* inside auth service ***********");
Object data = message.getData();
System.err.println("****** got data back from client " +data.toString());
sharedResponseQueue.add(data.toString());
}
@Override
public void onUpdates(List<AuthC> updates) {
System.err.println("********* inside auth service listner ***********");
MarkedReference<ServerChannel> createChannelIfAbsent = bayeux.createChannelIfAbsent("/ldapAuth", new ConfigurableServerChannel.Initializer() {
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
channel.setLazy(true);
}
});
ServerChannel reference = createChannelIfAbsent.getReference();
for (AuthC authC : updates) {
authParam = authC.getAuthStr();
this.sharedResponseQueue= authC.getsharedResponseQueue();
ServerChannel channel = bayeux.getChannel("/ldapAuth");
ServerMessageImpl serverMessageImpl = new ServerMessageImpl();
serverMessageImpl.setData(authParam);
reference.setBroadcastToPublisher(false);
reference.publish(getServerSession(), authParam, Promise.noop());
}
}
}
事件触发器class-
public class AuthenticationResource implements Runnable{
private final JerseyClientBuilder clientBuilder;
private final BlockingQueue<String> sharedQueue;
private final BlockingQueue<String> sharedResponseQueue;
private boolean isAuthCall = false;
private String userAuth;
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
Thread runner;
public AuthenticationResource(JerseyClientBuilder clientBuilder,BlockingQueue<String> sharedQueue,BlockingQueue<String> sharedResponseQueue) {
super();
this.clientBuilder = clientBuilder;
this.sharedQueue = sharedQueue;
this.sharedResponseQueue= sharedResponseQueue;
this.runner = new Thread(this);
this.runner.start();
}
public List<Listener> getListeners()
{
return listeners;
}
@Override
public void run() {
List<AuthC> updates = new ArrayList<AuthC>();
// boolean is = true;
while(true){
if(sharedQueue.size()<=0) {
continue;
}
try {
userAuth = sharedQueue.take();
// Notify the listeners
for (Listener listener : listeners)
{
updates.add(new AuthC(userAuth,sharedResponseQueue));
listener.onUpdates(updates);
}
updates.add(new AuthC(userAuth,sharedResponseQueue));
System.out.println("****** Auth consume ******** " + userAuth);
if(userAuth != null) {
isAuthCall = true;
}
} catch (Exception err) {
err.printStackTrace();
break;
}
// if (sharedQueue.size()>0) {
// is = false;
// }
}
}
public static class AuthC
{
private final String authStr;
private final BlockingQueue<String> sharedResponseQueue;
public AuthC(String authStr,BlockingQueue<String> sharedResponseQueue)
{
this.authStr = authStr;
this.sharedResponseQueue=sharedResponseQueue;
}
public String getAuthStr()
{
return authStr;
}
public BlockingQueue<String> getsharedResponseQueue()
{
return sharedResponseQueue;
}
}
public interface Listener extends EventListener
{
void onUpdates(List<AuthC> updates);
}
}
我已经成功建立了客户端和服务器之间的连接。 问题 -
1- 当我从服务器向客户端发送消息时,同一条消息被发送了多次。我只期待一种请求-响应机制。 在我的例子中——服务器正在发送用户凭据我期待结果,无论用户是否通过身份验证。
你可以在图像中看到它是如何在客户端用相同的字符串泛滥的 -
2- 客户端和服务器之间的消息循环存在其他问题,我可以通过添加来解决,但消息循环仍有一段时间。
serverChannel.setBroadcastToPublisher(false);
3- 如果我更改服务器上的授权字符串,在客户端它似乎是旧的。 例如-
- 1 个来自服务器的请求 - auth string -> user=foo,pass=bar -> at 客户端 - user=foo,pass=bar
- 2 来自服务器的请求 - auth string user=myuser,pass=mypass -> 在客户端 - user=foo,pass=bar
这就是这三个问题,请指导我,帮助我解决。
CometD 提供 request/response 使用远程调用的消息传递方式,两者都在 client and on the server (you want to use annotated services 服务器上)。
频道 /ldapAuth
有 2 个订阅者:远程客户端(订阅 authChannel.subscribe(...)
)和服务器端 AuthenticationService
(订阅 addService("/ldapAuth", "ldapAuthentication")
)。
因此,每次从 AuthenticationService.onUpdates(...)
发布到该频道时,您都会发布到远程客户端,然后返回到 AuthenticationService
,这就是调用 setBroadcastToPublisher(false)
有帮助的原因。
对于身份验证消息,最好坚持使用远程调用,因为它们具有自然的 request/response 语义,而不是广播语义。 请阅读 how applications should interact with CometD。
关于其他循环,没有CometD触发的循环。
您的应用程序中有循环(在 AuthenticationService.onUpdates(...)
中)并且您从一个队列中获取多次可能具有相同信息的队列(在 AuthenticationResource.run()
中——顺便说一句,它是一个自旋循环,可能会自旋一个CPU 核心利用率达到 100% -- 你应该解决这个问题)。
您看到陈旧数据这一事实可能不是 CometD 的问题,因为 CometD 不会在任何地方存储消息,因此它无法构成用户特定的数据。
我建议您使用远程调用和注释服务清理您的代码。 此外,从自旋循环中清理您自己的代码。
如果按照上面的建议还是有问题,请仔细检查应用程序错误,这不太可能是 CometD 的问题。