将数据从一个套接字线程广播到 java 中所有现有的套接字线程
Broadcasting Data from one socket thread to all existing socket threads in java
我有一个 ServerSocket 监听特定地址和端口上的连接,并为每个连接(客户端)创建一个线程以实现有状态协议,现在每个线程(客户端)消息在它自己的线程中处理,这里是问题所在:
1.How 我可以将数据从一个线程(客户端套接字)发送到所有线程(简单地广播一条消息。),我看到两个线程之间存在某种 BlockingQueues,这里我有一对多线程广播,也有私人消息的一对一模型。
2.I 查看用于在线程之间共享数据或资源的同步块解决方案,但在我的代码中,我不知道我应该在哪里实现它以及为什么?
ServerSocket 初始化和侦听的服务器入口点:
public class ServerStarter {
//All Users that creates from threads adding here.
public static Hashtable<String,User> users = new Hashtable<String,User>();
//All Channels add here.
public static Hashtable<String,Channel> channels = new Hashtable<String,Channel>();
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
public static void main(String args[]){
Config config = new Config();
ServerSocket Server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
Server = new ServerSocket(config.port, config.backlog,config.ServerIP);
} catch (IOException e) {
System.err.println(e);
}
while (true) {
Socket sock = null;
BufferedReader inFromClient = null;
try {
sock = Server.accept();
} catch (IOException e) {
if (Server != null && !Server.isClosed()) {
try {
Server.close();
} catch (IOException e1)
{
e1.printStackTrace(System.err);
}
}
System.err.println(e);
}
try {
inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
} catch (IOException e) {
System.err.println(e);
}
//each clients run on it's own thread!
new SocketThread(sock,inFromClient).start();
}
}
}
客户端套接字线程创建位置:
public class SocketThread extends Thread {
Socket csocket;
BufferedReader inFromClient;
public SocketThread(Socket csocket, BufferedReader inFromClient) {
this.csocket = csocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromclient = inFromClient.readLine();
//some primary informations sent to server for further processing.
RequestHandler Reqhandler = new RequestHandler(this.getId(), fromclient);
System.out.println("=======================================");
while (true) {
fromclient = inFromClient.readLine();
IRCParser parser = new IRCParser(fromclient);
//if the primary info's are OK and nothing causes to kill the thread the clients go to the infinite loop for processing messages.
Reqhandler.Commandhandler(parser.getCommand(), parser.getParameters());
}
} catch (IOException e) {
//kill current thread!
currentThread().interrupt();
return;
}
}
}
如果类和其他数据不够,告诉我添加更多代码和注释,tnx
如果我正确理解您的要求,您只需要将消息从一个客户端线程传递到其他客户端线程的东西。我想你应该可以在这里使用观察者模式。
类似这样的东西 -(请注意,我已经从您的代码中删除了不需要显示消息广播概念的所有其他内容。您可能需要根据您的要求将其改回)。
public class ServerStarter {
private static final ServerStarter singleton = new ServerStarter();
private volatile boolean shutdown;
// thread pool executor
private final ExecutorService executorService = Executors.newCachedThreadPool();
// observable to notify client threads
private final Observable observable = new Observable();
// fair lock (can use unfair lock if message broadcasting order is not important)
private final Lock fairLock = new ReentrantLock(true);
private ServerStarter() {
}
public static ServerStarter getInstance() {
return singleton;
}
public static void main(String args[]) {
ServerSocket server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
server = new ServerSocket();
while (!ServerStarter.getInstance().isShutdown()) {
Socket sock = server.accept();
BufferedReader inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
//each clients run on it's own thread!
SocketThread clientThread = new SocketThread(sock, inFromClient);
ServerStarter.getInstance().registerClientThread(clientThread);
ServerStarter.getInstance().startClientThread(clientThread);
}
} catch (IOException e) {
if (server != null) {
try {
server.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}
public void shutdown() {
shutdown = true;
}
public boolean isShutdown() {
return shutdown;
}
public void startClientThread(SocketThread clientThread) {
executorService.submit(clientThread);
}
private void registerClientThread(SocketThread clientThread) {
observable.addObserver(clientThread);
}
public void notifyAllClients(final Object message) {
fairLock.lock();
try {
executorService.submit(new MessageBroadcaster(message));
} finally {
fairLock.unlock();
}
}
public void unregisterClientThread(SocketThread clientThread) {
fairLock.lock();
try {
observable.deleteObserver(clientThread);
} finally {
fairLock.unlock();
}
}
private class MessageBroadcaster implements Runnable {
private final Object message;
public MessageBroadcaster(Object message) {
this.message = message;
}
@Override
public void run() {
fairLock.lock();
try {
observable.notifyObservers(message);
} finally {
fairLock.unlock();
}
}
}
}
class SocketThread implements Runnable, Observer {
Socket clientSocket;
BufferedReader inFromClient;
public SocketThread(Socket clientSocket, BufferedReader inFromClient) {
this.clientSocket = clientSocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromClient;
while (!ServerStarter.getInstance().isShutdown() && (fromClient = inFromClient.readLine()) != null) {
// TODO...prepare message to broadcast
Object message = new Object();
ServerStarter.getInstance().notifyAllClients(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ServerStarter.getInstance().unregisterClientThread(this);
}
}
@Override
public void update(Observable o, Object message) {
// TODO...handle the message
}
}
当客户端线程想要通知其他客户端线程时,它会异步使用可观察对象来通知其他线程。 observable 将调用每个客户端线程的 update()
方法。
我有一个 ServerSocket 监听特定地址和端口上的连接,并为每个连接(客户端)创建一个线程以实现有状态协议,现在每个线程(客户端)消息在它自己的线程中处理,这里是问题所在:
1.How 我可以将数据从一个线程(客户端套接字)发送到所有线程(简单地广播一条消息。),我看到两个线程之间存在某种 BlockingQueues,这里我有一对多线程广播,也有私人消息的一对一模型。
2.I 查看用于在线程之间共享数据或资源的同步块解决方案,但在我的代码中,我不知道我应该在哪里实现它以及为什么?
ServerSocket 初始化和侦听的服务器入口点:
public class ServerStarter {
//All Users that creates from threads adding here.
public static Hashtable<String,User> users = new Hashtable<String,User>();
//All Channels add here.
public static Hashtable<String,Channel> channels = new Hashtable<String,Channel>();
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
public static void main(String args[]){
Config config = new Config();
ServerSocket Server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
Server = new ServerSocket(config.port, config.backlog,config.ServerIP);
} catch (IOException e) {
System.err.println(e);
}
while (true) {
Socket sock = null;
BufferedReader inFromClient = null;
try {
sock = Server.accept();
} catch (IOException e) {
if (Server != null && !Server.isClosed()) {
try {
Server.close();
} catch (IOException e1)
{
e1.printStackTrace(System.err);
}
}
System.err.println(e);
}
try {
inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
} catch (IOException e) {
System.err.println(e);
}
//each clients run on it's own thread!
new SocketThread(sock,inFromClient).start();
}
}
}
客户端套接字线程创建位置:
public class SocketThread extends Thread {
Socket csocket;
BufferedReader inFromClient;
public SocketThread(Socket csocket, BufferedReader inFromClient) {
this.csocket = csocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromclient = inFromClient.readLine();
//some primary informations sent to server for further processing.
RequestHandler Reqhandler = new RequestHandler(this.getId(), fromclient);
System.out.println("=======================================");
while (true) {
fromclient = inFromClient.readLine();
IRCParser parser = new IRCParser(fromclient);
//if the primary info's are OK and nothing causes to kill the thread the clients go to the infinite loop for processing messages.
Reqhandler.Commandhandler(parser.getCommand(), parser.getParameters());
}
} catch (IOException e) {
//kill current thread!
currentThread().interrupt();
return;
}
}
}
如果类和其他数据不够,告诉我添加更多代码和注释,tnx
如果我正确理解您的要求,您只需要将消息从一个客户端线程传递到其他客户端线程的东西。我想你应该可以在这里使用观察者模式。
类似这样的东西 -(请注意,我已经从您的代码中删除了不需要显示消息广播概念的所有其他内容。您可能需要根据您的要求将其改回)。
public class ServerStarter {
private static final ServerStarter singleton = new ServerStarter();
private volatile boolean shutdown;
// thread pool executor
private final ExecutorService executorService = Executors.newCachedThreadPool();
// observable to notify client threads
private final Observable observable = new Observable();
// fair lock (can use unfair lock if message broadcasting order is not important)
private final Lock fairLock = new ReentrantLock(true);
private ServerStarter() {
}
public static ServerStarter getInstance() {
return singleton;
}
public static void main(String args[]) {
ServerSocket server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
server = new ServerSocket();
while (!ServerStarter.getInstance().isShutdown()) {
Socket sock = server.accept();
BufferedReader inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
//each clients run on it's own thread!
SocketThread clientThread = new SocketThread(sock, inFromClient);
ServerStarter.getInstance().registerClientThread(clientThread);
ServerStarter.getInstance().startClientThread(clientThread);
}
} catch (IOException e) {
if (server != null) {
try {
server.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}
public void shutdown() {
shutdown = true;
}
public boolean isShutdown() {
return shutdown;
}
public void startClientThread(SocketThread clientThread) {
executorService.submit(clientThread);
}
private void registerClientThread(SocketThread clientThread) {
observable.addObserver(clientThread);
}
public void notifyAllClients(final Object message) {
fairLock.lock();
try {
executorService.submit(new MessageBroadcaster(message));
} finally {
fairLock.unlock();
}
}
public void unregisterClientThread(SocketThread clientThread) {
fairLock.lock();
try {
observable.deleteObserver(clientThread);
} finally {
fairLock.unlock();
}
}
private class MessageBroadcaster implements Runnable {
private final Object message;
public MessageBroadcaster(Object message) {
this.message = message;
}
@Override
public void run() {
fairLock.lock();
try {
observable.notifyObservers(message);
} finally {
fairLock.unlock();
}
}
}
}
class SocketThread implements Runnable, Observer {
Socket clientSocket;
BufferedReader inFromClient;
public SocketThread(Socket clientSocket, BufferedReader inFromClient) {
this.clientSocket = clientSocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromClient;
while (!ServerStarter.getInstance().isShutdown() && (fromClient = inFromClient.readLine()) != null) {
// TODO...prepare message to broadcast
Object message = new Object();
ServerStarter.getInstance().notifyAllClients(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ServerStarter.getInstance().unregisterClientThread(this);
}
}
@Override
public void update(Observable o, Object message) {
// TODO...handle the message
}
}
当客户端线程想要通知其他客户端线程时,它会异步使用可观察对象来通知其他线程。 observable 将调用每个客户端线程的 update()
方法。