来自一个线程的非空值,在另一个线程中为空

Not null value from one thread, is null in another one

我正在 Java 中的多客户端 TCP 服务器上工作。由于服务器必须能够同时执行大量操作,因此我创建了一个线程,用于通过套接字向我的客户端发送数据。定义如下:

public class MessageSender implements Runnable{

    private OutputStream output;
    private volatile ArrayList<byte[]> messagesToSend;

    public MessageSender(OutputStream _output)
    {
        output = _output;
        messagesToSend = new ArrayList<byte[]>(0);
    }

    public void run()
    {
        while (true)
        {
           if (messagesToSend.size() > 0)
           {
               sendMessage(messagesToSend.get(0));
               messagesToSend.remove(0);
           }
        }
    }

    public void setMessageToSend(Message message)
    {
        messagesToSend.add(message.getMessageHandler().getBytes());
    }

    public void sendMessage(byte[] bytes)
    {
        if (bytes == null)
        {
            System.out.println("bytes = null");
            return ;
        }
        try
        {
            output.write(bytes, 0, bytes.length);
        }
        catch (IOException e)
        {
            System.out.println(e);
        }
    }
}

您在setMessageToSend()中看到的classMessage是我的class。我还创建了其他 classes(Channel_testChannel_status),对应于特定的消息,并且继承自 Message class。

我有另一个线程,我在其中创建了一个 Channel_status,如下所示:

eventBus.consumer(msg + port , message -> {
    sender.setMessageToSend(new Channel_status(eParameters));
    channel_test = new Channel_test(eParameters);
    System.out.println("Created Message");
});

这是一个 Vertx 处理程序,仅在收到来自客户端的特定消息时调用一次。它被调用,并且 channel_status 被按预期实例化。在同一个线程中,我有一个 Vertx 计时器,我将 channel_status class 包含的字节数组发送到 MessageSender

vertx.setPeriodic(duration, id -> {
    System.out.println(Arrays.toString(channel_test.getMessageHandler().getBytes()));
    sender.setMessageToSend(channel_test);
});

如您所见,我正在打印它的值以确保它不为空。而且,它在计时器中永远不会为空。但是当 MessageSender 收到它时,有时它是空的(有时我的意思是大约 1/50 次)。为什么?是并发问题吗?

谢谢。

将您的 run 方法更改为

public void run() {
    while (true) {
        synchronized(messagesToSend) {
           while (messagesToSend.isEmpty()) {
               messagesToSend.wait();
           }
        }
        sendMessage(messagesToSend.get(0));
        messagesToSend.remove(0);
   }

}

setMessageToSend()作为

public void setMessageToSend(Message message) {
    messagesToSend.add(message.getMessageHandler().getBytes());
    messagesToSend.notify();
}

使得ArrayList<byte[]>volatile保证当你在线程1中添加新元素时,线程2可以看到它。

您应该使用支持concurrency的列表,例如:

messagesToSend  = Collections.synchronizedList(new ArrayList<byte[]>(0));

这可以确保当您通过 setMessageToSend 添加新元素时,新元素将在 run 方法中可见。

更新:

Collections.synchronizedList在这种情况下仍然不能保证线程安全。由于在 run 方法中,这三个操作不是原子的。

if (messagesToSend.size() > 0)
{
    sendMessage(messagesToSend.get(0));  
    messagesToSend.remove(0);
}

一个BlockingQueue可以解决这个问题:

private BlockingQueue<byte[]> messagesToSend;

public MessageSender(OutputStream _output) {
    output = _output;
    messagesToSend = new LinkedBlockingQueue<>();
}

public void run() {
    while (true) {
        try {
            sendMessage(messagesToSend.take());  // this will get blocked untill the queue is not empty
        } catch (InterruptedException e) {

        }
    }
}

public void setMessageToSend(Message message) {
    try {
        messagesToSend.put(message.getMessageHandler().getBytes()); // put new element in the queue, check if run method is blocked, wake it up
    } catch (InterruptedException e) {

    }
}