具有消费者-生产者设计的 TCP 套接字服务器 'cpu time limit exceeded'
TCP socket server with consumer-producer design 'cpu time limit exceeded'
这个问题是在 运行 使用 consumer/producer 设计创建的套接字服务器时出现的,程序崩溃并在日志中显示错误 cpu time limit exceeded
。我还发现 cpu
当时的使用率超过 90%
。这是服务器的代码,它可能出了什么问题,我该如何优化它?
我使用这种 queue
方法来避免为每个请求创建这么多 threads
。
在main方法中(主线程)
//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();
//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));
producer.start();
consumer.start();
RequestProducer线程
//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
RequestConsumer线程
//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
socket = queue.poll();
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
excecuteDbInsert(in);
}
}
} catch (IOException | ParseException ex) {//handle exceptions}
}
数据流解析器
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
int length = in.read(b);
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
CPU 由于侵略性 while(true)
循环进入您的消费者,超出了时间限制。以下是您如何解决问题的示例。
您可以在消费者的 while 循环中添加简单的 Thread.sleep(1)
或使用 wait/notify 模式来限制 CPU 消费。
RequestProducer线程
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestProducer implements Runnable {
//this holds queue instance coming from main thread
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
synchronized (queue) {
System.out.println("notifying");
queue.notify();
}
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
}
}
RequestConsumer线程
import java.io.IOException;
import java.net.Socket;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestConsumer implements Runnable {
//this holds queue instance coming from main thread, same as requestproducer
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
System.out.println("Waiting for new socket");
synchronized (queue) {
queue.wait();
}
System.out.println("Acquired new socket");
socket = queue.poll();
try {
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
//excecuteDbInsert(in);
System.out.println(in);
}
} finally {
if (socket != null) {
socket.close();
}
}
}
} catch (IOException ex) {//handle exceptions}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
数据流解析器
import java.io.IOException;
import java.io.InputStream;
public class DataStreamUtil {
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
System.out.println("Waiting for input");
int length = in.read(b);
System.out.println("Got input");
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
}
这个问题是在 运行 使用 consumer/producer 设计创建的套接字服务器时出现的,程序崩溃并在日志中显示错误 cpu time limit exceeded
。我还发现 cpu
当时的使用率超过 90%
。这是服务器的代码,它可能出了什么问题,我该如何优化它?
我使用这种 queue
方法来避免为每个请求创建这么多 threads
。
在main方法中(主线程)
//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();
//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));
producer.start();
consumer.start();
RequestProducer线程
//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
RequestConsumer线程
//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
socket = queue.poll();
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
excecuteDbInsert(in);
}
}
} catch (IOException | ParseException ex) {//handle exceptions}
}
数据流解析器
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
int length = in.read(b);
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
CPU 由于侵略性 while(true)
循环进入您的消费者,超出了时间限制。以下是您如何解决问题的示例。
您可以在消费者的 while 循环中添加简单的 Thread.sleep(1)
或使用 wait/notify 模式来限制 CPU 消费。
RequestProducer线程
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestProducer implements Runnable {
//this holds queue instance coming from main thread
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
synchronized (queue) {
System.out.println("notifying");
queue.notify();
}
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
}
}
RequestConsumer线程
import java.io.IOException;
import java.net.Socket;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestConsumer implements Runnable {
//this holds queue instance coming from main thread, same as requestproducer
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
System.out.println("Waiting for new socket");
synchronized (queue) {
queue.wait();
}
System.out.println("Acquired new socket");
socket = queue.poll();
try {
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
//excecuteDbInsert(in);
System.out.println(in);
}
} finally {
if (socket != null) {
socket.close();
}
}
}
} catch (IOException ex) {//handle exceptions}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
数据流解析器
import java.io.IOException;
import java.io.InputStream;
public class DataStreamUtil {
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
System.out.println("Waiting for input");
int length = in.read(b);
System.out.println("Got input");
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
}