Queue中如何实现并发

How to implement concurrency in Queue

我正在尝试实现一个应该同步的队列。要求是:

  1. 如果队列为空并且有新数据到来,那么只要队列不为空,就应该将数据插入队列并开始处理数据。
  2. 如果Queue不为空,说明队列中的数据还在处理中。因此,如果出现一些新数据,则应将数据添加到队列中。

这是我的实现:

class QueueManager {
        private var jobsQueue: LinkedList<String> = LinkedList()
        fun handleIncomingJob() {
          if (isEmpty()) {
             addJ(jobList)
             start()
          } else {
             addJ(jobList)
          }
       }

private fun isEmpty(): Boolean {
        synchronized(this@QueueManager) {
            val ret = jobsQueue.isEmpty()
            return ret
        }
    }

    private fun addJ(jobList: Array<SAPJob>) {
        synchronized(this@QueueManager) {
            jobsQueue.add(jobList)
        }
    }

    private fun remove(): Array<SAPJob> {
        synchronized(this@QueueManager) {
            return jobsQueue.remove()
        }
    }

       fun start() {
            while (!isEmpty()) {
                Thread.sleep(10000)
                remove()
            }
       }
}

但是上面的代码不起作用。 当多个线程一次调用 handleIncomingJob 方法并且所有线程都发现队列为空时,它会失败。根据我的要求,只有第一个线程应该发现队列为空,向其中添加数据并开始处理。并且所有其他线程应该发现,由于第一个线程将项目放入队列中,所以它不是空的,所以他们应该将数据放入队列并退出。 但是按照上面的实现,所有线程都发现队列是空的。我不知道是什么问题。我已经使所有队列方法同步。还是不行

您的同步是多个函数的本地同步,但其中两个或多个函数构成了一个关键部分。换句话说,您不会跨函数调用持有锁,这意味着线程可以读取不一致的状态。例如,processIncomingJob() 函数内部可能会发生以下情况:

  1. 线程 A 调用 isEmpty() 结果为 true.
  2. 线程 A 移入 if 块。
  3. 在线程 A 可以调用另一个线程 addJ(...) 之前,B 调用 isEmpty() 并且因为线程 A 尚未实际添加作业,结果也是 true.
  4. 线程 B 移入 if 块。
  5. 最终线程 A 和线程 B 都调用了 start() 函数,导致了您的问题。

在使代码线程安全时,您必须将整个 工作单元封装在同步上下文中。 "unit-of-work" 实际包含的内容完全取决于代码的业务逻辑。请注意,您的 start() 函数存在同样的问题。

您的代码示例还有其他问题:

  • 您的 jobsQueue 被定义为 LinkedList<String>,但在其他任何地方它似乎都被视为 LinkedList<Array<SAPJob>>
  • 什么是 SAPJob
  • handleIncomingJob() 的上下文中,jobList 是什么?
  • start() 中调用 remove() 但不对结果执行任何操作。
  • 为什么是start()public?

由于这些问题,要完全理解您希望在代码中发生什么并不容易。但是,根据您对意图和问题的描述,我相信以下示例可以帮助您找到适合您的用例的解决方案:

import java.util.LinkedList
import java.util.Queue

typealias Job = () -> Unit

class JobProcessor {

    private val lock = Any()
    private val queue: Queue<Job> = LinkedList()
    private var isExecuting = false

    fun processJob(job: Job) {
        val execute = synchronized(lock) {
            queue.add(job)
            !isExecuting.also { isExecuting = true }
        }
        if (execute) {
            executeQueuedJobs()
        }
    }

    private fun executeQueuedJobs() {
        var keepExecuting: Boolean
        do {
            val job = synchronized(lock) { queue.remove() }
            try {
                job()
            } catch (ex: Exception) {
                Thread.currentThread().uncaughtExceptionHandler
                    .uncaughtException(Thread.currentThread(), ex)
            }
            keepExecuting = synchronized(lock) {
                queue.isNotEmpty().also { isExecuting = it }
            }
        } while (keepExecuting)
    }

}

当一个作业被添加到队列中时,添加作业的线程将只执行该作业,没有其他线程当前正在执行以前的作业。执行作业的线程将继续这样做,直到当前作业完成后队列为空。请注意,执行作业时 没有 持有锁,以便在执行一个作业时允许其他作业排队。 isExecuting的目的是处理线程正在执行作业但队列为空的情况。