Queue中如何实现并发
How to implement concurrency in Queue
我正在尝试实现一个应该同步的队列。要求是:
- 如果队列为空并且有新数据到来,那么只要队列不为空,就应该将数据插入队列并开始处理数据。
- 如果Queue不为空,说明队列中的数据还在处理中。因此,如果出现一些新数据,则应将数据添加到队列中。
这是我的实现:
class QueueManager {
private var jobsQueue: LinkedList<String> = LinkedList()
fun handleIncomingJob() {
if (isEmpty()) {
addJ(jobList)
start()
} else {
addJ(jobList)
}
}
private fun isEmpty(): Boolean {
synchronized(this@QueueManager) {
val ret = jobsQueue.isEmpty()
return ret
}
}
private fun addJ(jobList: Array<SAPJob>) {
synchronized(this@QueueManager) {
jobsQueue.add(jobList)
}
}
private fun remove(): Array<SAPJob> {
synchronized(this@QueueManager) {
return jobsQueue.remove()
}
}
fun start() {
while (!isEmpty()) {
Thread.sleep(10000)
remove()
}
}
}
但是上面的代码不起作用。
当多个线程一次调用 handleIncomingJob
方法并且所有线程都发现队列为空时,它会失败。根据我的要求,只有第一个线程应该发现队列为空,向其中添加数据并开始处理。并且所有其他线程应该发现,由于第一个线程将项目放入队列中,所以它不是空的,所以他们应该将数据放入队列并退出。
但是按照上面的实现,所有线程都发现队列是空的。我不知道是什么问题。我已经使所有队列方法同步。还是不行
您的同步是多个函数的本地同步,但其中两个或多个函数构成了一个关键部分。换句话说,您不会跨函数调用持有锁,这意味着线程可以读取不一致的状态。例如,processIncomingJob()
函数内部可能会发生以下情况:
- 线程
A
调用 isEmpty()
结果为 true
.
- 线程
A
移入 if
块。
- 在线程
A
可以调用另一个线程 addJ(...)
之前,B
调用 isEmpty()
并且因为线程 A
尚未实际添加作业,结果也是 true
.
- 线程
B
移入 if
块。
- 最终线程
A
和线程 B
都调用了 start()
函数,导致了您的问题。
在使代码线程安全时,您必须将整个 工作单元封装在同步上下文中。 "unit-of-work" 实际包含的内容完全取决于代码的业务逻辑。请注意,您的 start()
函数存在同样的问题。
您的代码示例还有其他问题:
- 您的
jobsQueue
被定义为 LinkedList<String>
,但在其他任何地方它似乎都被视为 LinkedList<Array<SAPJob>>
。
- 什么是
SAPJob
?
- 在
handleIncomingJob()
的上下文中,jobList
是什么?
- 在
start()
中调用 remove()
但不对结果执行任何操作。
- 为什么是
start()
public?
由于这些问题,要完全理解您希望在代码中发生什么并不容易。但是,根据您对意图和问题的描述,我相信以下示例可以帮助您找到适合您的用例的解决方案:
import java.util.LinkedList
import java.util.Queue
typealias Job = () -> Unit
class JobProcessor {
private val lock = Any()
private val queue: Queue<Job> = LinkedList()
private var isExecuting = false
fun processJob(job: Job) {
val execute = synchronized(lock) {
queue.add(job)
!isExecuting.also { isExecuting = true }
}
if (execute) {
executeQueuedJobs()
}
}
private fun executeQueuedJobs() {
var keepExecuting: Boolean
do {
val job = synchronized(lock) { queue.remove() }
try {
job()
} catch (ex: Exception) {
Thread.currentThread().uncaughtExceptionHandler
.uncaughtException(Thread.currentThread(), ex)
}
keepExecuting = synchronized(lock) {
queue.isNotEmpty().also { isExecuting = it }
}
} while (keepExecuting)
}
}
当一个作业被添加到队列中时,添加作业的线程将只执行该作业,没有其他线程当前正在执行以前的作业。执行作业的线程将继续这样做,直到当前作业完成后队列为空。请注意,执行作业时 没有 持有锁,以便在执行一个作业时允许其他作业排队。 isExecuting
的目的是处理线程正在执行作业但队列为空的情况。
我正在尝试实现一个应该同步的队列。要求是:
- 如果队列为空并且有新数据到来,那么只要队列不为空,就应该将数据插入队列并开始处理数据。
- 如果Queue不为空,说明队列中的数据还在处理中。因此,如果出现一些新数据,则应将数据添加到队列中。
这是我的实现:
class QueueManager {
private var jobsQueue: LinkedList<String> = LinkedList()
fun handleIncomingJob() {
if (isEmpty()) {
addJ(jobList)
start()
} else {
addJ(jobList)
}
}
private fun isEmpty(): Boolean {
synchronized(this@QueueManager) {
val ret = jobsQueue.isEmpty()
return ret
}
}
private fun addJ(jobList: Array<SAPJob>) {
synchronized(this@QueueManager) {
jobsQueue.add(jobList)
}
}
private fun remove(): Array<SAPJob> {
synchronized(this@QueueManager) {
return jobsQueue.remove()
}
}
fun start() {
while (!isEmpty()) {
Thread.sleep(10000)
remove()
}
}
}
但是上面的代码不起作用。
当多个线程一次调用 handleIncomingJob
方法并且所有线程都发现队列为空时,它会失败。根据我的要求,只有第一个线程应该发现队列为空,向其中添加数据并开始处理。并且所有其他线程应该发现,由于第一个线程将项目放入队列中,所以它不是空的,所以他们应该将数据放入队列并退出。
但是按照上面的实现,所有线程都发现队列是空的。我不知道是什么问题。我已经使所有队列方法同步。还是不行
您的同步是多个函数的本地同步,但其中两个或多个函数构成了一个关键部分。换句话说,您不会跨函数调用持有锁,这意味着线程可以读取不一致的状态。例如,processIncomingJob()
函数内部可能会发生以下情况:
- 线程
A
调用isEmpty()
结果为true
. - 线程
A
移入if
块。 - 在线程
A
可以调用另一个线程addJ(...)
之前,B
调用isEmpty()
并且因为线程A
尚未实际添加作业,结果也是true
. - 线程
B
移入if
块。 - 最终线程
A
和线程B
都调用了start()
函数,导致了您的问题。
在使代码线程安全时,您必须将整个 工作单元封装在同步上下文中。 "unit-of-work" 实际包含的内容完全取决于代码的业务逻辑。请注意,您的 start()
函数存在同样的问题。
您的代码示例还有其他问题:
- 您的
jobsQueue
被定义为LinkedList<String>
,但在其他任何地方它似乎都被视为LinkedList<Array<SAPJob>>
。 - 什么是
SAPJob
? - 在
handleIncomingJob()
的上下文中,jobList
是什么? - 在
start()
中调用remove()
但不对结果执行任何操作。 - 为什么是
start()
public?
由于这些问题,要完全理解您希望在代码中发生什么并不容易。但是,根据您对意图和问题的描述,我相信以下示例可以帮助您找到适合您的用例的解决方案:
import java.util.LinkedList
import java.util.Queue
typealias Job = () -> Unit
class JobProcessor {
private val lock = Any()
private val queue: Queue<Job> = LinkedList()
private var isExecuting = false
fun processJob(job: Job) {
val execute = synchronized(lock) {
queue.add(job)
!isExecuting.also { isExecuting = true }
}
if (execute) {
executeQueuedJobs()
}
}
private fun executeQueuedJobs() {
var keepExecuting: Boolean
do {
val job = synchronized(lock) { queue.remove() }
try {
job()
} catch (ex: Exception) {
Thread.currentThread().uncaughtExceptionHandler
.uncaughtException(Thread.currentThread(), ex)
}
keepExecuting = synchronized(lock) {
queue.isNotEmpty().also { isExecuting = it }
}
} while (keepExecuting)
}
}
当一个作业被添加到队列中时,添加作业的线程将只执行该作业,没有其他线程当前正在执行以前的作业。执行作业的线程将继续这样做,直到当前作业完成后队列为空。请注意,执行作业时 没有 持有锁,以便在执行一个作业时允许其他作业排队。 isExecuting
的目的是处理线程正在执行作业但队列为空的情况。