关于 ConcurrentLinkedQueue

Regarding ConcurrentLinkedQueue

我有一个多线程场景,其中使用了一个使用 Packet 类型的 ConcurrentLinkedQueue 的 Sender class。 Packet 是一个 POJO 定义为:

class Packet {
    int seq;
    byte[] data;
}

发件人 class 定义为:

class Sender implements Runnable {
     private static ConcurrentLinkedQueue<Packet> queue;

     public Sender() {
        if (ccl == null)
        ccl = new ConcurrentLinkedQueue<Packet>();
     }

     public static boolean enqueue(Packet p) {
        if (queue == null) {
           queue = new ConcurrentLinkedQueue<Packet>();
        }
        //System.out.println(p.toString());
        return queue.add(p);
     }

     @Override
     public void run() {
       TcpSend tcp = new TcpSend();

       while (true) {
            Packet p = queue.remove(); 
            // some code here
            //System.out.println(p.toString());
            tcp.send(p);
       }
   }
}

来自另一个 java class 我正在将文件读入字节数组并将其添加到 ConcurrentLinkedQueue。当我在 enqueue() 中打印数据包的详细信息时,我得到了正确的详细信息。但是当我在 运行() 中打印详细信息时,即从队列中删除数据包后,我得到了正确的序列,但我得到了添加到队列中的最后一个数据包的数据。对于从队列中删除的所有数据包,都会发生这种情况。

数据通过以下方法添加

  public void addPacket() {
    int bytesRead = 0; 
    int seq = 1;  
    byte[] fileInBytes = new byte[1500];
    BufferedInputStream in = new BufferedInputStream(new    
                                            FileInputStream(fileName));

    while((bytesRead = in.read(fileInBytes)) != -1) {   
            Sender.enqueue(new Packet(seq, fileInBytes);
            seq++;
    }
  }

求推荐。 TIA

您正在使用相同的字节数组 (fileInBytes) 从流中读取:

byte[] fileInBytes = new byte[1500];
BufferedInputStream in = new BufferedInputStream(new    
                                        FileInputStream(fileName));

while((bytesRead = in.read(fileInBytes)) != -1) {   
        Sender.enqueue(new Packet(seq, fileInBytes);
        seq++;
}

我猜测是Packet的构造函数没有复制字节数组:

class Packet {
    int seq;
    byte[] data;

    public Packet(int seq, byte[] data) {
       this.seq = seq;
       this.data = data;
   }
}

这意味着您的所有 Packet.data 字段都指向同一个字节数组,每次您从流中读取时该数组都会被覆盖。

您需要使用副本:

byte[] fileInBytes = new byte[1500];
BufferedInputStream in = new BufferedInputStream(new    
                                        FileInputStream(fileName));

while((bytesRead = in.read(fileInBytes)) != -1) {
        byte[] packetBytes = Arrays.copyOf(fileInBytes, bytesRead);
        Sender.enqueue(new Packet(seq, packetBytes );
        seq++;
}