调用多个单独的线程 asynchronously.But 一个线程卡在循环中,另一个线程不执行

Calling multiple individual thread asynchronously.But one thread stuck in loop and another thread doesn't execute

这里我有一个服务器和客户端架构服务器将无限循环发送消息。在客户端首先是一个线程,它将与服务器建立连接,它还将创建两个不同的线程,第一个线程应该从服务器读取数据并插入到队列中,第二个线程应该从中读取数据排队。

我已经尝试在第一个线程中调用第二个线程以确认函数完全正常工作。但这不是我想要的。 我也试过打破循环并再次调用递归插入的方法,这对我也不起作用。

这是我正在使用的三个 类。

 Server.class
package threadPool.poc;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

    public static void main(String[] args) {
        try {
            ServerSocket ss = new ServerSocket(8080);
            Socket s = ss.accept();
            OutputStream outputStream = s.getOutputStream();
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputStream);
            BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);
            System.out.println("Server Started");
            int i = 1;
            while (true) {
                System.out.println("Writing :String" + i);
                bufferedWriter.write("String " + i + "\n");
                bufferedWriter.flush();
                Thread.sleep(1000);
                i++;
            }

        } catch (Exception e) {
            System.out.println(e);
        } finally {

        }
    }

}

QueueImplementation.class

package threadPool.poc;

import java.io.UnsupportedEncodingException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueImplementation {

    public Queue<byte[]> threadQueue = new ConcurrentLinkedQueue<>();

    public void insertInQueue(byte[] value) {
        boolean isInserted = threadQueue.add(value);
        if (isInserted) {
            System.out.println("Inserted in a queue");
        } else {
            System.out.println("Cannot insert");
        }

    }

    public String retriveFromQueue() {
        try {
            byte[] data = threadQueue.poll();
            if (data == null) {
                System.out.println("Doesn't have any data for threadNumber ");
                return null;
            }
            System.out.println("Data in the form of byte array for threadNumber :" + data);
            String s = new String(data, "UTF-8");
            System.out.println(s);
            return s;
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

}

Client.class

package threadPool.poc;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class Client implements Runnable {

    public Client() {
        super();
    }

    static QueueImplementation queueImplementation = new QueueImplementation();

    private String threadType;

    private String message;

    private int i;

    private static BufferedReader bufferedReader = null;

    public static void main(String[] args) {

        System.out.println("Starting");
        Client client = new Client("connect");
        client.run();
    }

    public Client(String threadType) {
        super();
        this.threadType = threadType;
    }

    @Override
    public void run() {
        try {
            switch (threadType) {
            case ("connect"):
                connectServer();
                break;
            case ("insertPool"):
                insertIntoQueue();
                break;
            case ("ReadPool"):
                readMessage();
                break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void insertIntoQueue() throws InterruptedException, IOException {
        int i = 1;
        BufferedReader reader = bufferedReader;
        System.out.println("IS buffered reader empty : " + reader);
        while (reader != null) {
            String message = reader.readLine();
            System.out.println("Reading message from server : " + message);
            if (message != null) {
                queueImplementation.insertInQueue(message.getBytes());
                System.out.println("Inserted in a Queue");
            } else {
                System.out.println("Null message read from server ");
            }
            i++;
        }

    }

    private void connectServer() {
        try {
            Socket s = new Socket("localhost", 8080);
            InputStreamReader inputStreamReader = new InputStreamReader(s.getInputStream());
            bufferedReader = new BufferedReader(inputStreamReader);
            System.out.println("Client started");
            Runnable client = new Client("insertPool");
            Runnable client2 = new Client("ReadPool", i);
            /*
             * ExecutorService executorService = Executors.newCachedThreadPool();
             * executorService.execute(client2); executorService.execute(client);
             */
            client2.run();
            client.run();



        } catch (Exception e) {
            System.out.println(e);
        } finally {
            System.out.println("Entered in finally");
        }
    }


    public Client(String threadType, String message) {
        super();
        this.threadType = threadType;
        this.message = message;
    }

    public Client(String threadType, int i) {
        super();
        this.threadType = threadType;
        this.i = i;
    }

    private void readMessage() throws InterruptedException {

        while (true) {

            System.out.println("reading data from queue for thread : " + i);
            queueImplementation.retriveFromQueue();
            Thread.sleep(1000);
        }

    }
}

即将出现的输出显示客户端 1 或客户端 2 的线程卡在循环中。哪个不调用另一个线程,它一直在等待。

来自服务器控制台。

Server Started
Writing :String1
Writing :String2
Writing :String3
Writing :String4
Writing :String5
Writing :String6
Writing :String7
Writing :String8
Writing :String9
Writing :String10
Writing :String11
Writing :String12
Writing :String13

来自客户端控制台:

Starting
Client started
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0
Doesn't have any data for threadNumber 
reading data from queue for thread : 0

看到客户端结果它一直保持 运行 直到我 stopped.The 这里的问题是插入线程没有调用。

我的预期输出应该是这样的:

从队列 method/thread 调用中读取的线程以及插入 method/thread 之间的线程也应该在那里。

你不能定义哪个线程应该做什么...但是你可以让 1 个线程做事而其他线程保持...检查 synchronized 关键字

https://www.baeldung.com/java-synchronized

这里的问题是您的客户只是 运行nable。它需要一个支持线程来 运行 它的代码。

当你打电话时

client2.run();

当前线程进入并执行这个运行nable。另一个客户端一直在那里等待前一个客户端完成,但它不会因为它处于循环中。

你需要做的是将这个 运行nable 包装在一个新线程中并像

一样发送它
new Thread(client2).start();