来自一个线程的非空值,在另一个线程中为空
Not null value from one thread, is null in another one
我正在 Java 中的多客户端 TCP 服务器上工作。由于服务器必须能够同时执行大量操作,因此我创建了一个线程,用于通过套接字向我的客户端发送数据。定义如下:
public class MessageSender implements Runnable{
private OutputStream output;
private volatile ArrayList<byte[]> messagesToSend;
public MessageSender(OutputStream _output)
{
output = _output;
messagesToSend = new ArrayList<byte[]>(0);
}
public void run()
{
while (true)
{
if (messagesToSend.size() > 0)
{
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
}
}
public void setMessageToSend(Message message)
{
messagesToSend.add(message.getMessageHandler().getBytes());
}
public void sendMessage(byte[] bytes)
{
if (bytes == null)
{
System.out.println("bytes = null");
return ;
}
try
{
output.write(bytes, 0, bytes.length);
}
catch (IOException e)
{
System.out.println(e);
}
}
}
您在setMessageToSend()
中看到的classMessage
是我的class。我还创建了其他 classes(Channel_test
、Channel_status
),对应于特定的消息,并且继承自 Message
class。
我有另一个线程,我在其中创建了一个 Channel_status
,如下所示:
eventBus.consumer(msg + port , message -> {
sender.setMessageToSend(new Channel_status(eParameters));
channel_test = new Channel_test(eParameters);
System.out.println("Created Message");
});
这是一个 Vertx
处理程序,仅在收到来自客户端的特定消息时调用一次。它被调用,并且 channel_status 被按预期实例化。在同一个线程中,我有一个 Vertx 计时器,我将 channel_status class 包含的字节数组发送到 MessageSender
。
vertx.setPeriodic(duration, id -> {
System.out.println(Arrays.toString(channel_test.getMessageHandler().getBytes()));
sender.setMessageToSend(channel_test);
});
如您所见,我正在打印它的值以确保它不为空。而且,它在计时器中永远不会为空。但是当 MessageSender 收到它时,有时它是空的(有时我的意思是大约 1/50 次)。为什么?是并发问题吗?
谢谢。
将您的 run
方法更改为
public void run() {
while (true) {
synchronized(messagesToSend) {
while (messagesToSend.isEmpty()) {
messagesToSend.wait();
}
}
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
}
和setMessageToSend()
作为
public void setMessageToSend(Message message) {
messagesToSend.add(message.getMessageHandler().getBytes());
messagesToSend.notify();
}
使得ArrayList<byte[]>
volatile
保证当你在线程1中添加新元素时,线程2可以看到它。
您应该使用支持concurrency的列表,例如:
messagesToSend = Collections.synchronizedList(new ArrayList<byte[]>(0));
这可以确保当您通过 setMessageToSend
添加新元素时,新元素将在 run
方法中可见。
更新:
Collections.synchronizedList
在这种情况下仍然不能保证线程安全。由于在 run
方法中,这三个操作不是原子的。
if (messagesToSend.size() > 0)
{
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
一个BlockingQueue
可以解决这个问题:
private BlockingQueue<byte[]> messagesToSend;
public MessageSender(OutputStream _output) {
output = _output;
messagesToSend = new LinkedBlockingQueue<>();
}
public void run() {
while (true) {
try {
sendMessage(messagesToSend.take()); // this will get blocked untill the queue is not empty
} catch (InterruptedException e) {
}
}
}
public void setMessageToSend(Message message) {
try {
messagesToSend.put(message.getMessageHandler().getBytes()); // put new element in the queue, check if run method is blocked, wake it up
} catch (InterruptedException e) {
}
}
我正在 Java 中的多客户端 TCP 服务器上工作。由于服务器必须能够同时执行大量操作,因此我创建了一个线程,用于通过套接字向我的客户端发送数据。定义如下:
public class MessageSender implements Runnable{
private OutputStream output;
private volatile ArrayList<byte[]> messagesToSend;
public MessageSender(OutputStream _output)
{
output = _output;
messagesToSend = new ArrayList<byte[]>(0);
}
public void run()
{
while (true)
{
if (messagesToSend.size() > 0)
{
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
}
}
public void setMessageToSend(Message message)
{
messagesToSend.add(message.getMessageHandler().getBytes());
}
public void sendMessage(byte[] bytes)
{
if (bytes == null)
{
System.out.println("bytes = null");
return ;
}
try
{
output.write(bytes, 0, bytes.length);
}
catch (IOException e)
{
System.out.println(e);
}
}
}
您在setMessageToSend()
中看到的classMessage
是我的class。我还创建了其他 classes(Channel_test
、Channel_status
),对应于特定的消息,并且继承自 Message
class。
我有另一个线程,我在其中创建了一个 Channel_status
,如下所示:
eventBus.consumer(msg + port , message -> {
sender.setMessageToSend(new Channel_status(eParameters));
channel_test = new Channel_test(eParameters);
System.out.println("Created Message");
});
这是一个 Vertx
处理程序,仅在收到来自客户端的特定消息时调用一次。它被调用,并且 channel_status 被按预期实例化。在同一个线程中,我有一个 Vertx 计时器,我将 channel_status class 包含的字节数组发送到 MessageSender
。
vertx.setPeriodic(duration, id -> {
System.out.println(Arrays.toString(channel_test.getMessageHandler().getBytes()));
sender.setMessageToSend(channel_test);
});
如您所见,我正在打印它的值以确保它不为空。而且,它在计时器中永远不会为空。但是当 MessageSender 收到它时,有时它是空的(有时我的意思是大约 1/50 次)。为什么?是并发问题吗?
谢谢。
将您的 run
方法更改为
public void run() {
while (true) {
synchronized(messagesToSend) {
while (messagesToSend.isEmpty()) {
messagesToSend.wait();
}
}
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
}
和setMessageToSend()
作为
public void setMessageToSend(Message message) {
messagesToSend.add(message.getMessageHandler().getBytes());
messagesToSend.notify();
}
使得ArrayList<byte[]>
volatile
您应该使用支持concurrency的列表,例如:
messagesToSend = Collections.synchronizedList(new ArrayList<byte[]>(0));
这可以确保当您通过 setMessageToSend
添加新元素时,新元素将在 run
方法中可见。
更新:
Collections.synchronizedList
在这种情况下仍然不能保证线程安全。由于在 run
方法中,这三个操作不是原子的。
if (messagesToSend.size() > 0)
{
sendMessage(messagesToSend.get(0));
messagesToSend.remove(0);
}
一个BlockingQueue
可以解决这个问题:
private BlockingQueue<byte[]> messagesToSend;
public MessageSender(OutputStream _output) {
output = _output;
messagesToSend = new LinkedBlockingQueue<>();
}
public void run() {
while (true) {
try {
sendMessage(messagesToSend.take()); // this will get blocked untill the queue is not empty
} catch (InterruptedException e) {
}
}
}
public void setMessageToSend(Message message) {
try {
messagesToSend.put(message.getMessageHandler().getBytes()); // put new element in the queue, check if run method is blocked, wake it up
} catch (InterruptedException e) {
}
}