使用 ExecutorService 和 BlockingQueue 锁定(停放)Runnable

Runnable locked (park) using ExecutorService and BlockingQueue

注意:我了解网站规则,但我不能把所有代码(complex/large代码)。

我放一个不同的所有真正的代码太多了,你不需要这里 Github 中的代码,但像视频一样重现问题(主要 class 是 joseluisbz.mock.support.TestOptimalDSP,切换 class 是 joseluisbz.mock.support.runnable.ProcessorDSP)。

请不要向我推荐此代码的其他 jar 或外部库。

我希望更具体一些,但我不知道要提取和显示哪一部分。 在你关闭这个问题之前:显然,如果有人告诉我在哪里看(技术细节),我愿意完善我的问题。

我做了一个 video 以显示我的问题。

连提问题,我都画了个图来说明情况。

我的程序有一个JTree,显示Worker.

之间的关系

我有一个控制生命的线程与 ExecutorService executorService = Executors.newCachedThreadPool();List<Future<?>> listFuture = Collections.synchronizedList(new ArrayList<>());

之间的交互图

每个 Runnable 在其构造函数中以这种方式 listFuture().add(executorService().submit(this)); 启动。列表是这样创建的:BlockingQueue<Custom> someBlockingQueue = new LinkedBlockingQueue<>();

我的图表显示了 Worker 的父亲,如果他有的话。 它还显示了 BlockingQueue.

之间的书写关系

RunnableStopper 停止 Worker 中包含的相关可运行程序,如 属性。 RunnableDecrementerRunnableIncrementerRunnableFilter 运行一个循环,该循环运行它为其 BlockingQueue 接收的每个自定义。 他们总是为此创建一个 RunnableProcessor(它没有循环,但由于其处理时间长,一旦任务完成,它应该由 GC 收集)。

内部 RunnableIncrementer 有一个地图 Map<Integer, List<Custom>> mapListDelayedCustom = new HashMap<>();//Collections.synchronizedMap(new HashMap<>());

当到达一些自定义...我需要获取 lastReceivedCustom 的列表 List<Custom> listDelayedCustom = mapListDelayedCustom.putIfAbsent(custom.getCode(), new ArrayList<>()); 我正在控制大小(不会无限增长)。

当我添加以下行时,我的代码停止工作:

if (listDelayedCustom.size() > SomeValue) {
  //No operation has yet been included in if sentence
}

但是注释行不会阻塞

//if (listDelayedCustom.size() > SomeValue) {
//  //No operation has yet been included in if sentence
//}

什么会阻止我的 Runnable? 添加上面指示的行(评估列表的大小:if sentence)停止工作是没有意义的。

有什么建议可以进一步说明我的问题吗?

首先,您设置线程名称的方式是错误的。您使用此模式:

public class Test
{
    public static class Task implements Runnable
    {
        public Task()
        {
            Thread.currentThread().setName("Task");
        }

        @Override
        public void run()
        {
            System.out.println("Task: "+Thread.currentThread().getName());
        }
    }

    public static void main(String[] args)
    {
        new Thread(new Task()).start();
        System.out.println("Main: "+Thread.currentThread().getName());
    }
}

给出(不希望的)结果:

Main: Task
Task: Thread-0

这是不正确的,因为在 Task 构造函数中,线程尚未启动,因此您正在更改调用线程的名称,而不是衍生线程的名称。您应该在 run() 方法中设置名称。

因此,您截图中的线程名称是错误的。

现在是真正的问题。在 WorkerDSPIncrement 中,您有这一行:

List<ChunkDTO> listDelayedChunkDTO = mapListDelayedChunkDTO.putIfAbsent(chunkDTO.getPitch(), new ArrayList<>());

putIfAbsent() 的文档说:

If the specified key is not already associated with a value (or is mapped to null) associates it with the given value and returns null, else returns the current value.

由于地图最初是空的,所以您第一次调用 putIfAbsent() 时,它 returns null 并将其分配给 listDelayedChunkDTO

然后你创建一个ProcessorDSP对象:

ProcessorDSP processorDSP = new ProcessorDSP(controlDSP, upNodeDSP, null,
   dHnCoefficients, chunkDTO, listDelayedChunkDTO, Arrays.asList(parent.getParentBlockingQueue()));

这意味着你将null作为listDelayedChunkDTO参数传递。所以当这一行在 ProcessorDSP:

中执行时
if (listDelayedChunkDTO.size() > 2) {

它抛出一个 NullPointerException 并且 runnable 停止。