Producer/Consumer(某种程度上)使用并发哈希映射和 Java 线程

Producer/Consumer (sort of) with concurrenthashmap and Java threads

我有一个线程写入并发哈希映射,另一个线程读取这些块(不删除它们)。

给出了一个类似的问题here,几乎提供了一个解决方案,但并不完全适用于我的情况。

我有这个class:

public class BlockStore {

    private ConcurrentHashMap<Integer, Long> blockToOffset = new ConcurrentHashMap<>();
    private RandomAccessFile randomAccessFile;


    public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
        this.randomAccessFile = randomAccessFile;
        randomAccessFile.setLength(fileLength);
    }

    public boolean hasBlock(int blockNumber) {
        return blockToOffset.containsKey(blockNumber);
    }

    public void getBlock(int blockNumber, byte[] buffer) throws BlockNotFoundException, IOException {
        if (hasBlock(blockNumber)) {
            long offset = blockToOffset.get(blockNumber);
            randomAccessFile.seek(offset);
            randomAccessFile.readFully(buffer);
        } else throw new BlockNotFoundException();
    }

    public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
        long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
        randomAccessFile.seek(offset);
        randomAccessFile.write(buffer);
        blockToOffset.put(blockNumber, offset);
        blockToOffset.notifyAll();
    }

    public boolean allFilesReceived() throws IOException {
        double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
        return expectedNumberOfBlocks == blockToOffset.size();
    }
}

例如给定以下情况

1: thread2.getBlock(0, emtpyBuff) // 块不存在所以线程等待
2. thread1.writeBlock(0, buffWithData) // 将数据块写入文件,将块添加到 hashmap。
3:thread2 被通知块现在存在,从 hashmap 中检索偏移量,将相应的文件块写入 emptyBuff,然后继续执行操作。

这个问题和另一个问题的主要区别是我不想直接 return hashmap 的值,而是在 blockstore class 中对其执行进一步的操作获取实际文件数据。

谢谢! :)

编辑:

我考虑过简单地从 thread2 轮询 blockstore 直到给出 return 值,但这会导致很多不必要的 CPU 使用。

这个问题似乎没有 'nice' 解决方案,所以我修改了参考问题以匹配我自己的问题。这只是一个安全的解决方案,当一个线程作为轮询从阻塞队列中读取时,return getBlock() 中的值不是线程安全的。

public class BlockStore {

    private ConcurrentHashMap<Integer, BlockingQueue<Long>> blockToOffset = new ConcurrentHashMap<>();
    private RandomAccessFile randomAccessFile;

    public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
        this.randomAccessFile = randomAccessFile;
        randomAccessFile.setLength(fileLength);
    }

    private synchronized BlockingQueue<Long> ensureQueueExists(int key) {
        if (blockToOffset.containsKey(key)) {
            return blockToOffset.get(key);
        } else {
            BlockingQueue<Long> queue = new ArrayBlockingQueue<>(1);
            blockToOffset.put(key, queue);
            return queue;
        }
    }

    public void getBlock(int blockNumber, byte[] buffer) {
        BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
        try {
            long offset = queue.poll(60L, TimeUnit.SECONDS);
            queue.add(offset); // Put offset back into queue. Since get will only be called by one thread, this does not result in a race condition
            randomAccessFile.seek(offset);
            randomAccessFile.readFully(buffer);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
        BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
        long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
        randomAccessFile.seek(offset);
        randomAccessFile.write(buffer);
        queue.add(offset);
        blockToOffset.put(blockNumber, queue);
    }

    public boolean allFilesReceived() throws IOException {
        double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
        return expectedNumberOfBlocks == blockToOffset.size();
    }
}