Java 多线程服务器-客户端通信

Java multi-threaded server-client communication

我遇到了一个令人沮丧的问题,我正在尝试做一个问题来准备大学考试 - 应该有一堆客户端通过单独服务器上的 tcp 套接字访问资源,这同步访问。我有一个基本的入口点 class,主要是实例化客户端或服务器,还有一个 class 用于序列化并通过套接字传递的对象:

DataPacket.class 来源:

import java.io.*;
import java.util.Date;

public class DataPacket implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    final public String payload;
    final public String creator;
    final public Date timestamp;

    public DataPacket(String creator, String payload){
        this.creator = creator;
        this.payload = payload;
        this.timestamp = new Date();
    }

}

client.class 来源:

import java.io.*;
import java.net.*;
import java.util.Random;

public class client extends Thread implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    public client(String name){
        this.name = name;
    }
    public void run(){
        try {
            Socket sock = new Socket(InetAddress.getLocalHost(), 12345);
            ObjectInputStream in = new ObjectInputStream(sock.getInputStream());
            ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream());
            Random rand = new Random();

// obviously the only reason to have this loop here is to initiate a bunch of access 
// requests to the server, but for debugging purposes I've removed the extra cycles
// and stuck to only one;

            for(int i=0; i<1; i++){ 
                DataPacket data = new DataPacket(this.name, Integer.toString(rand.nextInt()));
                out.writeObject(data);
                out.flush();
            }
            sock.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

最后 server.class 来源在这里:

import java.io.*;
import java.net.*;

public class server extends Thread{
    private int port;
    private DataPacket dp;
    public boolean running=false;

    public void updateData(DataPacket dp){
        this.dp=dp;
    }

    public server(int port){
        this.port = port;
        }

    public void run(){
        this.running = true;
        while (running){
            openSocket();
            }
        }

    public void openSocket(){
        try {
            ServerSocket s = new ServerSocket(this.port, 50, InetAddress.getLocalHost());
            while (true){
                Socket incomming = s.accept();
                ObjectInputStream in = new ObjectInputStream(incomming.getInputStream());
                ObjectOutputStream out = new ObjectOutputStream(incomming.getOutputStream());
                synchronized (dp){
                    while ((dp = (DataPacket) in.readObject()) != null) {
                        System.out.println(dp.creator + "passed a frame @" + dp.timestamp + " :"+dp.payload);
                    } while (in.readObject() == null){
                        Thread.sleep(100);
                        System.out.print(".");
                    }
                }
                s.close();
            }           
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

现在我不确定为什么,但显然客户端也需要定义一个输入流(不做任何从服务器到客户端的回话,但它爆炸了一堆拒绝连接异常,除非我做声明一个。为了安全起见,我在服务器上声明了一个输出以防万一。不幸的是,尽管客户端似乎没有通过套接字传递对象,但服务器没有得到它,或者我在序列化和反序列化方面惨遭失败..

当我在做的时候,我希望能得到一些关于服务器上数据访问同步的建议 - 这 "synchronised (dp)" 能完成这项工作吗,或者我是否需要将线程客户端手动进入等待队列并使用 notify() 手动取出其中一个以传输其序列化数据?

提前致谢! vlex

好吧,我花了几天额外的时间来研究它并重写了几次,现在可以了。这是我的启用多线程的服务器示例:

serverentry.class:

package server2;

public class serverentry {

    public static void main(String[] args) {
        server[] serv = new server[10];
        for (int i=0; i<10; i++){
            serv[i] = new server(12345+i);
            serv[i].start();
        }
    }
}

server.class:

package server2;

import java.io.*;
import java.net.*;
import java.util.*;

public class server extends Thread implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private int port;
    private static String data = "nothing yet";
    public server(int port){
        this.port = port;
        System.out.println("Server thread["+(this.port-12345)+"] started");
    }

    public void run(){
        try{
            ServerSocket sock = new ServerSocket(this.port, 50, InetAddress.getLocalHost());
            Socket accepted = sock.accept();                
            BufferedReader br = new BufferedReader(new InputStreamReader(accepted.getInputStream()));
            while(br.readLine() !=null){
                synchronized(data){
                    updateData(br.readLine());
                }
                if (br.readLine() == null) break;
            }
            sock.close();
        } catch (Exception e) {
            e.printStackTrace();
        }       
    }

    private void updateData(String streamObj){
        String[] streamArr = streamObj.split("&");
        data = streamArr[1];
        System.out.println("Data updated to "+data+" by "+streamArr[0]+" @ "+ new Date());
    }
}

cliententry.class:

package client2;

public class cliententry {

    public static void main(String[] args) {
        client2[] cli = new client2[10];
        for(int i=0; i<10; i++){
            cli[i] = new client2(i);
            cli[i].start();
        }
    }
}

client2.class:

package client2;

import java.io.*;
import java.net.*;
import java.util.*;

public class client2 extends Thread implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String payload;
    private int number; 
    private Random rand;
    public client2(int num){
        rand = new Random();
        this.number = num;
    }

    public void run(){
        try{
            Socket sock = new Socket(InetAddress.getLocalHost(),(12345+this.number));
            PrintWriter pw = new PrintWriter(new OutputStreamWriter(sock.getOutputStream()));
            for (int i=0; i<30; i++){
                String random = Integer.toString(this.rand.nextInt());
                this.payload = "thread[" + this.number + "]&"+random; 
                pw.write(this.payload);
                pw.write("\n");
                pw.flush();
                }
            sock.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

还有一次我重写了它,我使用了 运行() 方法而不是 start() ,这后来解释了为什么我只得到大约 10 个更新中的 3 个 - 运行 在同一个线程中启动逻辑,同时 start 实例化另一个线程。现在一切都像瑞士钟一样工作了:)

干杯, vlex