很多写手一个reader没有并发
Many writers one reader without concurrency
我在 java 中编程,我有一个 List<LogEntry> log
在不同线程之间共享。
这些 "writers" 个线程之间已经同步,因此一次只有一个线程可以从 log
添加或删除元素
但是,由于我试图实现的分布式算法,日志的一部分是 "safe",这意味着它们不能被作者或 [=49] 修改=](我在下面介绍)。 log
的这部分由字段int committedIndex
表示,初始化为0,单调递增。
综上所述,作者在(commitIndex,log.size())
范围内修改log
中的元素,而有一个reader获取范围内包含的log
中的元素[0,commitIndex]
。 reader 从第一个条目开始读取,然后读取下一个条目,直到到达 log.get(commitIndex)
,然后停止并进入睡眠状态,直到增加 commitIndex
。它更新一个字段 lastApplied
,该字段初始化为 0 并单调递增,以记住他在睡觉前阅读的最后一个 logEntry
。
如您所见,无需同步 reader 和编写器,因为它们访问 log
.
的不同部分
我的问题是:当commitIndex
增加时,如何"wake up"reader的线程?我需要这样的东西(由作者执行):
if(commitIndex is updated)
{
//wake up reader
}
和 reader:
public void run() {
while(true){
//go to sleeep...
//now the reader is awaken!
while(lastApplied<commitIndex){
//do something with log.get(lastApplied)
lastApplied++;
}
}
很明显,我极大地简化了我的代码,以便让您尽可能更好地理解我想要什么,如果不够清楚,我很抱歉(请随时向我提问)。谢谢!
试试这个:
if(commitIndex is updated)
{
//wake up reader
synchronized(reader)
{
reader.notify();
}
}
使用共享 LinkedBlockingQueue<Integer>
(在 reader 和所有作者之间)让每个作者向 reader 发出信号,表明 commitIndex
变量已被修改:
作者:
if (commitIndex is updated) {
// wake up reader
this.queue.add(commitIndex);
}
Reader:
public void run() {
while (true) {
// take() puts this thread to sleep until a writer calls add()
int commitIndex = this.queue.take();
// now the reader is awaken!
while (lastApplied < commitIndex) {
// do something with log.get(lastApplied)
lastApplied++;
}
}
}
这里我使用了属性 queue
,它应该对应于 LinkedBlockingQueue
的同一个实例,用于 reader 和所有作者。
注意:异常处理留作练习。
我在 java 中编程,我有一个 List<LogEntry> log
在不同线程之间共享。
这些 "writers" 个线程之间已经同步,因此一次只有一个线程可以从 log
但是,由于我试图实现的分布式算法,日志的一部分是 "safe",这意味着它们不能被作者或 [=49] 修改=](我在下面介绍)。 log
的这部分由字段int committedIndex
表示,初始化为0,单调递增。
综上所述,作者在(commitIndex,log.size())
范围内修改log
中的元素,而有一个reader获取范围内包含的log
中的元素[0,commitIndex]
。 reader 从第一个条目开始读取,然后读取下一个条目,直到到达 log.get(commitIndex)
,然后停止并进入睡眠状态,直到增加 commitIndex
。它更新一个字段 lastApplied
,该字段初始化为 0 并单调递增,以记住他在睡觉前阅读的最后一个 logEntry
。
如您所见,无需同步 reader 和编写器,因为它们访问 log
.
我的问题是:当commitIndex
增加时,如何"wake up"reader的线程?我需要这样的东西(由作者执行):
if(commitIndex is updated)
{
//wake up reader
}
和 reader:
public void run() {
while(true){
//go to sleeep...
//now the reader is awaken!
while(lastApplied<commitIndex){
//do something with log.get(lastApplied)
lastApplied++;
}
}
很明显,我极大地简化了我的代码,以便让您尽可能更好地理解我想要什么,如果不够清楚,我很抱歉(请随时向我提问)。谢谢!
试试这个:
if(commitIndex is updated)
{
//wake up reader
synchronized(reader)
{
reader.notify();
}
}
使用共享 LinkedBlockingQueue<Integer>
(在 reader 和所有作者之间)让每个作者向 reader 发出信号,表明 commitIndex
变量已被修改:
作者:
if (commitIndex is updated) {
// wake up reader
this.queue.add(commitIndex);
}
Reader:
public void run() {
while (true) {
// take() puts this thread to sleep until a writer calls add()
int commitIndex = this.queue.take();
// now the reader is awaken!
while (lastApplied < commitIndex) {
// do something with log.get(lastApplied)
lastApplied++;
}
}
}
这里我使用了属性 queue
,它应该对应于 LinkedBlockingQueue
的同一个实例,用于 reader 和所有作者。
注意:异常处理留作练习。