多线程 UDP 服务器 Java
Multithreaded UDP Server Java
我有两个线程,一个正在监听套接字并添加到队列中,另一个从队列中减去并提交处理。第二个线程在队列为空时休眠。这种睡眠会以某种方式影响第一个线程,也就是说,如果您删除睡眠或使其变大,那么第一个线程的 socket.receive 中的延迟会增加。如果我将睡眠保持得尽可能低,它会变得更好,但并不完美。我做错了什么?
private DatagramSocket socket;
private boolean running, listenthreadwork;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private ConcurrentLinkedQueue<MyObject> list = new ConcurrentLinkedQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
running = true;
listenthreadwork = true;
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
while (listenthreadwork == true) {
MyObjectinfo = null;
synchronized (list) {
if (!list.isEmpty()) {
info = list.poll();
}
}
if (info != null) {
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
} else {
try {
Thread.sleep(10);//Somehow affects listenLoopUDP
} catch (InterruptedException e) {
Log.write(e);
}
}
}
}
public void listenLoopUDP() {
while (running) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
socket.receive(packet);
} catch (IOException e) {
if (socket.isClosed()) {
running = false;
continue;
} else {
Log.write(e);
socket.close();
}
}
String received = new String(packet.getData());
MyObject info = new MyObject(received);
synchronized (list) {
list.offer(info);
}
}
listenthreadwork = false;
try {
listenerThread.join();
} catch (InterruptedException e) {
Log.write(e);
}
}
我有一个带计时器的简单客户端
public class TestClient {
private DatagramSocket socket;
private InetAddress address;
private int count = 0;
public TestClient() {
try {
socket = new DatagramSocket();
} catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
address = InetAddress.getByName("localhost");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendEcho(String msg) {
msg = msg + count++;
ByteBuffer bb = ByteBuffer.allocate(256);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.put(msg.getBytes());
DatagramPacket packet = new DatagramPacket(bb.array(), bb.capacity(), address, 15000);
try {
socket.send(packet);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() {
socket.close();
}
}
TestClient client = new TestClient();
Timer timer = new Timer(false);
timer.schedule(new TimerTask() {
@Override
public void run() {
client.send("hello");
}
}, 1, 1);
但在服务器中我收到类似这样的消息
2021-05-17 20:04:48.320
2021-05-17 20:04:48.335
2021-05-17 20:04:48.351
2021-05-17 20:04:48.367
2021-05-17 20:04:48.382
连删不删都无济于事运行listenerThread
synchronized (list) {
list.offer(info);
}
实际上你不需要睡觉,使用适当的队列 类 代替,比如 LinkedBlockingQueue
,我也删除了标志,因为你也不需要它们,使用 interrupt()
停止阻塞等待队列元素的线程:
private DatagramSocket socket;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private LinkedBlockingQueue<MyObject> list = new LinkedBlockingQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
try {
while (true) {
MyObject info=list.take();
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
}
} catch (InterruptedException ex) {
//Just quit
}
}
public void listenLoopUDP() {
try {
while (true) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
String received = new String(packet.getData());
MyObject info = new MyObject(received);
list.put(info);
}
} catch (IOException e) {
Log.write(e);
} catch (InterruptedException e) {
Log.write(e);
} finally {
//Any exception above (or a runtime one) will activate this block where we do the cleanup and interrupt the other running thread
listenerThread.interrupt();
socket.close();
}
}
我用你的 1ms 客户端做了测试,打印发送和接收的消息,消息之间有完美的交错,所以瓶颈不在接收线程;完美交错我的意思是,在控制台中,如预期的那样,从客户端发送的消息紧跟在接收到的消息之后。
我有两个线程,一个正在监听套接字并添加到队列中,另一个从队列中减去并提交处理。第二个线程在队列为空时休眠。这种睡眠会以某种方式影响第一个线程,也就是说,如果您删除睡眠或使其变大,那么第一个线程的 socket.receive 中的延迟会增加。如果我将睡眠保持得尽可能低,它会变得更好,但并不完美。我做错了什么?
private DatagramSocket socket;
private boolean running, listenthreadwork;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private ConcurrentLinkedQueue<MyObject> list = new ConcurrentLinkedQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
running = true;
listenthreadwork = true;
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
while (listenthreadwork == true) {
MyObjectinfo = null;
synchronized (list) {
if (!list.isEmpty()) {
info = list.poll();
}
}
if (info != null) {
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
} else {
try {
Thread.sleep(10);//Somehow affects listenLoopUDP
} catch (InterruptedException e) {
Log.write(e);
}
}
}
}
public void listenLoopUDP() {
while (running) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
socket.receive(packet);
} catch (IOException e) {
if (socket.isClosed()) {
running = false;
continue;
} else {
Log.write(e);
socket.close();
}
}
String received = new String(packet.getData());
MyObject info = new MyObject(received);
synchronized (list) {
list.offer(info);
}
}
listenthreadwork = false;
try {
listenerThread.join();
} catch (InterruptedException e) {
Log.write(e);
}
}
我有一个带计时器的简单客户端
public class TestClient {
private DatagramSocket socket;
private InetAddress address;
private int count = 0;
public TestClient() {
try {
socket = new DatagramSocket();
} catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
address = InetAddress.getByName("localhost");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendEcho(String msg) {
msg = msg + count++;
ByteBuffer bb = ByteBuffer.allocate(256);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.put(msg.getBytes());
DatagramPacket packet = new DatagramPacket(bb.array(), bb.capacity(), address, 15000);
try {
socket.send(packet);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() {
socket.close();
}
}
TestClient client = new TestClient();
Timer timer = new Timer(false);
timer.schedule(new TimerTask() {
@Override
public void run() {
client.send("hello");
}
}, 1, 1);
但在服务器中我收到类似这样的消息
2021-05-17 20:04:48.320
2021-05-17 20:04:48.335
2021-05-17 20:04:48.351
2021-05-17 20:04:48.367
2021-05-17 20:04:48.382
连删不删都无济于事运行listenerThread
synchronized (list) {
list.offer(info);
}
实际上你不需要睡觉,使用适当的队列 类 代替,比如 LinkedBlockingQueue
,我也删除了标志,因为你也不需要它们,使用 interrupt()
停止阻塞等待队列元素的线程:
private DatagramSocket socket;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private LinkedBlockingQueue<MyObject> list = new LinkedBlockingQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
try {
while (true) {
MyObject info=list.take();
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
}
} catch (InterruptedException ex) {
//Just quit
}
}
public void listenLoopUDP() {
try {
while (true) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
String received = new String(packet.getData());
MyObject info = new MyObject(received);
list.put(info);
}
} catch (IOException e) {
Log.write(e);
} catch (InterruptedException e) {
Log.write(e);
} finally {
//Any exception above (or a runtime one) will activate this block where we do the cleanup and interrupt the other running thread
listenerThread.interrupt();
socket.close();
}
}
我用你的 1ms 客户端做了测试,打印发送和接收的消息,消息之间有完美的交错,所以瓶颈不在接收线程;完美交错我的意思是,在控制台中,如预期的那样,从客户端发送的消息紧跟在接收到的消息之后。