Producer/Consumer(某种程度上)使用并发哈希映射和 Java 线程
Producer/Consumer (sort of) with concurrenthashmap and Java threads
我有一个线程写入并发哈希映射,另一个线程读取这些块(不删除它们)。
给出了一个类似的问题here,几乎提供了一个解决方案,但并不完全适用于我的情况。
我有这个class:
public class BlockStore {
private ConcurrentHashMap<Integer, Long> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
public boolean hasBlock(int blockNumber) {
return blockToOffset.containsKey(blockNumber);
}
public void getBlock(int blockNumber, byte[] buffer) throws BlockNotFoundException, IOException {
if (hasBlock(blockNumber)) {
long offset = blockToOffset.get(blockNumber);
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} else throw new BlockNotFoundException();
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
blockToOffset.put(blockNumber, offset);
blockToOffset.notifyAll();
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}
- 线程 1 向块存储提供块,这些块在并发哈希图中进行了索引。这些块是写入文件的实际 byte[] 的抽象。该块在写入之前是空的。
- Thread2 试图在索引 i 处获取一个块,如果它不存在,它应该等到它存在并检索它。线程 2 稍后可能会再次访问同一个块,因此无法将其删除。
例如给定以下情况
1: thread2.getBlock(0, emtpyBuff) // 块不存在所以线程等待
2. thread1.writeBlock(0, buffWithData) // 将数据块写入文件,将块添加到 hashmap。
3:thread2 被通知块现在存在,从 hashmap 中检索偏移量,将相应的文件块写入 emptyBuff,然后继续执行操作。
这个问题和另一个问题的主要区别是我不想直接 return hashmap 的值,而是在 blockstore class 中对其执行进一步的操作获取实际文件数据。
谢谢! :)
编辑:
我考虑过简单地从 thread2 轮询 blockstore 直到给出 return 值,但这会导致很多不必要的 CPU 使用。
这个问题似乎没有 'nice' 解决方案,所以我修改了参考问题以匹配我自己的问题。这只是一个安全的解决方案,当一个线程作为轮询从阻塞队列中读取时,return getBlock() 中的值不是线程安全的。
public class BlockStore {
private ConcurrentHashMap<Integer, BlockingQueue<Long>> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
private synchronized BlockingQueue<Long> ensureQueueExists(int key) {
if (blockToOffset.containsKey(key)) {
return blockToOffset.get(key);
} else {
BlockingQueue<Long> queue = new ArrayBlockingQueue<>(1);
blockToOffset.put(key, queue);
return queue;
}
}
public void getBlock(int blockNumber, byte[] buffer) {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
try {
long offset = queue.poll(60L, TimeUnit.SECONDS);
queue.add(offset); // Put offset back into queue. Since get will only be called by one thread, this does not result in a race condition
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
queue.add(offset);
blockToOffset.put(blockNumber, queue);
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}
我有一个线程写入并发哈希映射,另一个线程读取这些块(不删除它们)。
给出了一个类似的问题here,几乎提供了一个解决方案,但并不完全适用于我的情况。
我有这个class:
public class BlockStore {
private ConcurrentHashMap<Integer, Long> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
public boolean hasBlock(int blockNumber) {
return blockToOffset.containsKey(blockNumber);
}
public void getBlock(int blockNumber, byte[] buffer) throws BlockNotFoundException, IOException {
if (hasBlock(blockNumber)) {
long offset = blockToOffset.get(blockNumber);
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} else throw new BlockNotFoundException();
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
blockToOffset.put(blockNumber, offset);
blockToOffset.notifyAll();
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}
- 线程 1 向块存储提供块,这些块在并发哈希图中进行了索引。这些块是写入文件的实际 byte[] 的抽象。该块在写入之前是空的。
- Thread2 试图在索引 i 处获取一个块,如果它不存在,它应该等到它存在并检索它。线程 2 稍后可能会再次访问同一个块,因此无法将其删除。
例如给定以下情况
1: thread2.getBlock(0, emtpyBuff) // 块不存在所以线程等待
2. thread1.writeBlock(0, buffWithData) // 将数据块写入文件,将块添加到 hashmap。
3:thread2 被通知块现在存在,从 hashmap 中检索偏移量,将相应的文件块写入 emptyBuff,然后继续执行操作。
这个问题和另一个问题的主要区别是我不想直接 return hashmap 的值,而是在 blockstore class 中对其执行进一步的操作获取实际文件数据。
谢谢! :)
编辑:
我考虑过简单地从 thread2 轮询 blockstore 直到给出 return 值,但这会导致很多不必要的 CPU 使用。
这个问题似乎没有 'nice' 解决方案,所以我修改了参考问题以匹配我自己的问题。这只是一个安全的解决方案,当一个线程作为轮询从阻塞队列中读取时,return getBlock() 中的值不是线程安全的。
public class BlockStore {
private ConcurrentHashMap<Integer, BlockingQueue<Long>> blockToOffset = new ConcurrentHashMap<>();
private RandomAccessFile randomAccessFile;
public BlockStore(RandomAccessFile randomAccessFile, long fileLength) throws IOException {
this.randomAccessFile = randomAccessFile;
randomAccessFile.setLength(fileLength);
}
private synchronized BlockingQueue<Long> ensureQueueExists(int key) {
if (blockToOffset.containsKey(key)) {
return blockToOffset.get(key);
} else {
BlockingQueue<Long> queue = new ArrayBlockingQueue<>(1);
blockToOffset.put(key, queue);
return queue;
}
}
public void getBlock(int blockNumber, byte[] buffer) {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
try {
long offset = queue.poll(60L, TimeUnit.SECONDS);
queue.add(offset); // Put offset back into queue. Since get will only be called by one thread, this does not result in a race condition
randomAccessFile.seek(offset);
randomAccessFile.readFully(buffer);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public void writeBlock(int blockNumber, byte[] buffer) throws IOException {
BlockingQueue<Long> queue = ensureQueueExists(blockNumber);
long offset = blockNumber * NodeUtil.FILE_BUFFER_SIZE;
randomAccessFile.seek(offset);
randomAccessFile.write(buffer);
queue.add(offset);
blockToOffset.put(blockNumber, queue);
}
public boolean allFilesReceived() throws IOException {
double expectedNumberOfBlocks = Math.ceil(((double) randomAccessFile.length()/NodeUtil.FILE_BUFFER_SIZE));
return expectedNumberOfBlocks == blockToOffset.size();
}
}