多线程 UDP 服务器 Java

Multithreaded UDP Server Java

我有两个线程,一个正在监听套接字并添加到队列中,另一个从队列中减去并提交处理。第二个线程在队列为空时休眠。这种睡眠会以某种方式影响第一个线程,也就是说,如果您删除睡眠或使其变大,那么第一个线程的 socket.receive 中的延迟会增加。如果我将睡眠保持得尽可能低,它会变得更好,但并不完美。我做错了什么?

private DatagramSocket socket;
private boolean running, listenthreadwork;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private ConcurrentLinkedQueue<MyObject> list = new ConcurrentLinkedQueue<MyObject>();

public void init(int port) throws SocketException
{
    socket = new DatagramSocket(port);
    
    runnerThread = new Thread(this::listenLoopUDP);
    runnerThread.setName("listenLoopUDP");
    
    listenerThread = new Thread(this::listenerThreadUDP);
    listenerThread.setName("listenerThreadUDP");
    
    running = true;
    listenthreadwork = true;
    runnerThread.start();
    listenerThread.start();
}

private void listenerThreadUDP() {
    while (listenthreadwork == true) {
        MyObjectinfo = null;
        synchronized (list) {
            if (!list.isEmpty()) {
                info = list.poll();
            }
        }
        if (info != null) {
            for (IUDPListener listener : listeners) {
                listener.msgReceived(info);
            }
        } else {
            try {
                Thread.sleep(10);//Somehow affects listenLoopUDP
            } catch (InterruptedException e) {
                Log.write(e);
            }
        }
    }
}

public void listenLoopUDP() {
    while (running) {
        DatagramPacket packet = new DatagramPacket(buf, buf.length);

        try {
            socket.receive(packet);
        } catch (IOException e) {
            if (socket.isClosed()) {
                running = false;
                continue;
            } else {
                Log.write(e);
                socket.close();
            }
        }
        
        String received = new String(packet.getData());

        MyObject info = new MyObject(received);
        synchronized (list) {
            list.offer(info);
        }
    }

    listenthreadwork = false;
    try {
        listenerThread.join();
    } catch (InterruptedException e) {
        Log.write(e);
    }
}

我有一个带计时器的简单客户端

public class TestClient {
    private DatagramSocket socket;
    private InetAddress address;
    private int count = 0;    

    public TestClient() {
        try {
            socket = new DatagramSocket();
        } catch (SocketException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        try {
            address = InetAddress.getByName("localhost");
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void sendEcho(String msg) {
        msg = msg + count++;
        ByteBuffer bb = ByteBuffer.allocate(256);
        bb.order(ByteOrder.LITTLE_ENDIAN);
        bb.put(msg.getBytes());
        DatagramPacket packet = new DatagramPacket(bb.array(), bb.capacity(), address, 15000);
        try {
            socket.send(packet);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void close() {
        socket.close();
    }
}

    TestClient client = new TestClient();
    Timer timer = new Timer(false);
    timer.schedule(new TimerTask() {
    
        @Override
        public void run() {
            client.send("hello");
        }
    
    }, 1, 1);

但在服务器中我收到类似这样的消息
2021-05-17 20:04:48.320
2021-05-17 20:04:48.335
2021-05-17 20:04:48.351
2021-05-17 20:04:48.367
2021-05-17 20:04:48.382

连删不删都无济于事运行listenerThread

        synchronized (list) {
            list.offer(info);
        }

实际上你不需要睡觉,使用适当的队列 类 代替,比如 LinkedBlockingQueue,我也删除了标志,因为你也不需要它们,使用 interrupt() 停止阻塞等待队列元素的线程:

private DatagramSocket socket;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private LinkedBlockingQueue<MyObject> list = new LinkedBlockingQueue<MyObject>();

public void init(int port) throws SocketException
{
    socket = new DatagramSocket(port);
    
    runnerThread = new Thread(this::listenLoopUDP);
    runnerThread.setName("listenLoopUDP");
    
    listenerThread = new Thread(this::listenerThreadUDP);
    listenerThread.setName("listenerThreadUDP");
    
    runnerThread.start();
    listenerThread.start();
}

private void listenerThreadUDP() {
    try {
        while (true) {
            MyObject info=list.take();
            for (IUDPListener listener : listeners) {
                listener.msgReceived(info);
            }
        }
    } catch (InterruptedException ex) {
        //Just quit
    }
}

public void listenLoopUDP() {
    try {
        while (true) {
            DatagramPacket packet = new DatagramPacket(buf, buf.length);
            socket.receive(packet);
            String received = new String(packet.getData());
            MyObject info = new MyObject(received);
            list.put(info);
        }
    } catch (IOException e) {
        Log.write(e);
    } catch (InterruptedException e) {
        Log.write(e);
    } finally {
        //Any exception above (or a runtime one) will activate this block where we do the cleanup and interrupt the other running thread
        listenerThread.interrupt();
        socket.close();
    }
}

我用你的 1ms 客户端做了测试,打印发送和接收的消息,消息之间有完美的交错,所以瓶颈不在接收线程;完美交错我的意思是,在控制台中,如预期的那样,从客户端发送的消息紧跟在接收到的消息之后。