使用 ExecutorService 和 BlockingQueue 锁定(停放)Runnable
Runnable locked (park) using ExecutorService and BlockingQueue
注意:我了解网站规则,但我不能把所有代码(complex/large代码)。
我放一个不同的(所有真正的代码太多了,你不需要这里) Github 中的代码,但像视频一样重现问题(主要 class 是 joseluisbz.mock.support.TestOptimalDSP
,切换 class 是 joseluisbz.mock.support.runnable.ProcessorDSP
)。
请不要向我推荐此代码的其他 jar 或外部库。
我希望更具体一些,但我不知道要提取和显示哪一部分。
在你关闭这个问题之前:显然,如果有人告诉我在哪里看(技术细节),我愿意完善我的问题。
我做了一个 video 以显示我的问题。
连提问题,我都画了个图来说明情况。
我的程序有一个JTree
,显示Worker
.
之间的关系
我有一个控制生命的线程与 ExecutorService executorService = Executors.newCachedThreadPool();
和 List<Future<?>> listFuture = Collections.synchronizedList(new ArrayList<>());
之间的交互图
每个 Runnable
在其构造函数中以这种方式 listFuture().add(executorService().submit(this));
启动。列表是这样创建的:BlockingQueue<Custom> someBlockingQueue = new LinkedBlockingQueue<>();
我的图表显示了 Worker
的父亲,如果他有的话。
它还显示了 BlockingQueue
.
之间的书写关系
RunnableStopper
停止 Worker
中包含的相关可运行程序,如 属性。
RunnableDecrementer
、RunnableIncrementer
、RunnableFilter
运行一个循环,该循环运行它为其 BlockingQueue
接收的每个自定义。
他们总是为此创建一个 RunnableProcessor
(它没有循环,但由于其处理时间长,一旦任务完成,它应该由 GC
收集)。
内部 RunnableIncrementer
有一个地图 Map<Integer, List<Custom>> mapListDelayedCustom = new HashMap<>();//Collections.synchronizedMap(new HashMap<>());
当到达一些自定义...我需要获取 lastReceivedCustom 的列表 List<Custom> listDelayedCustom = mapListDelayedCustom.putIfAbsent(custom.getCode(), new ArrayList<>());
我正在控制大小(不会无限增长)。
当我添加以下行时,我的代码停止工作:
if (listDelayedCustom.size() > SomeValue) {
//No operation has yet been included in if sentence
}
但是注释行不会阻塞
//if (listDelayedCustom.size() > SomeValue) {
// //No operation has yet been included in if sentence
//}
什么会阻止我的 Runnable?
添加上面指示的行(评估列表的大小:if sentence
)停止工作是没有意义的。
有什么建议可以进一步说明我的问题吗?
首先,您设置线程名称的方式是错误的。您使用此模式:
public class Test
{
public static class Task implements Runnable
{
public Task()
{
Thread.currentThread().setName("Task");
}
@Override
public void run()
{
System.out.println("Task: "+Thread.currentThread().getName());
}
}
public static void main(String[] args)
{
new Thread(new Task()).start();
System.out.println("Main: "+Thread.currentThread().getName());
}
}
给出(不希望的)结果:
Main: Task
Task: Thread-0
这是不正确的,因为在 Task
构造函数中,线程尚未启动,因此您正在更改调用线程的名称,而不是衍生线程的名称。您应该在 run()
方法中设置名称。
因此,您截图中的线程名称是错误的。
现在是真正的问题。在 WorkerDSPIncrement 中,您有这一行:
List<ChunkDTO> listDelayedChunkDTO = mapListDelayedChunkDTO.putIfAbsent(chunkDTO.getPitch(), new ArrayList<>());
putIfAbsent() 的文档说:
If the specified key is not already associated with a value (or is mapped to null) associates it with the given value and returns null, else returns the current value.
由于地图最初是空的,所以您第一次调用 putIfAbsent()
时,它 returns null
并将其分配给 listDelayedChunkDTO
。
然后你创建一个ProcessorDSP
对象:
ProcessorDSP processorDSP = new ProcessorDSP(controlDSP, upNodeDSP, null,
dHnCoefficients, chunkDTO, listDelayedChunkDTO, Arrays.asList(parent.getParentBlockingQueue()));
这意味着你将null
作为listDelayedChunkDTO
参数传递。所以当这一行在 ProcessorDSP
:
中执行时
if (listDelayedChunkDTO.size() > 2) {
它抛出一个 NullPointerException
并且 runnable 停止。
注意:我了解网站规则,但我不能把所有代码(complex/large代码)。
我放一个不同的(所有真正的代码太多了,你不需要这里) Github 中的代码,但像视频一样重现问题(主要 class 是 joseluisbz.mock.support.TestOptimalDSP
,切换 class 是 joseluisbz.mock.support.runnable.ProcessorDSP
)。
请不要向我推荐此代码的其他 jar 或外部库。
我希望更具体一些,但我不知道要提取和显示哪一部分。 在你关闭这个问题之前:显然,如果有人告诉我在哪里看(技术细节),我愿意完善我的问题。
我做了一个 video 以显示我的问题。
连提问题,我都画了个图来说明情况。
我的程序有一个JTree
,显示Worker
.
我有一个控制生命的线程与 ExecutorService executorService = Executors.newCachedThreadPool();
和 List<Future<?>> listFuture = Collections.synchronizedList(new ArrayList<>());
每个 Runnable
在其构造函数中以这种方式 listFuture().add(executorService().submit(this));
启动。列表是这样创建的:BlockingQueue<Custom> someBlockingQueue = new LinkedBlockingQueue<>();
我的图表显示了 Worker
的父亲,如果他有的话。
它还显示了 BlockingQueue
.
RunnableStopper
停止 Worker
中包含的相关可运行程序,如 属性。
RunnableDecrementer
、RunnableIncrementer
、RunnableFilter
运行一个循环,该循环运行它为其 BlockingQueue
接收的每个自定义。
他们总是为此创建一个 RunnableProcessor
(它没有循环,但由于其处理时间长,一旦任务完成,它应该由 GC
收集)。
内部 RunnableIncrementer
有一个地图 Map<Integer, List<Custom>> mapListDelayedCustom = new HashMap<>();//Collections.synchronizedMap(new HashMap<>());
当到达一些自定义...我需要获取 lastReceivedCustom 的列表 List<Custom> listDelayedCustom = mapListDelayedCustom.putIfAbsent(custom.getCode(), new ArrayList<>());
我正在控制大小(不会无限增长)。
当我添加以下行时,我的代码停止工作:
if (listDelayedCustom.size() > SomeValue) {
//No operation has yet been included in if sentence
}
但是注释行不会阻塞
//if (listDelayedCustom.size() > SomeValue) {
// //No operation has yet been included in if sentence
//}
if sentence
)停止工作是没有意义的。
有什么建议可以进一步说明我的问题吗?
首先,您设置线程名称的方式是错误的。您使用此模式:
public class Test
{
public static class Task implements Runnable
{
public Task()
{
Thread.currentThread().setName("Task");
}
@Override
public void run()
{
System.out.println("Task: "+Thread.currentThread().getName());
}
}
public static void main(String[] args)
{
new Thread(new Task()).start();
System.out.println("Main: "+Thread.currentThread().getName());
}
}
给出(不希望的)结果:
Main: Task
Task: Thread-0
这是不正确的,因为在 Task
构造函数中,线程尚未启动,因此您正在更改调用线程的名称,而不是衍生线程的名称。您应该在 run()
方法中设置名称。
因此,您截图中的线程名称是错误的。
现在是真正的问题。在 WorkerDSPIncrement 中,您有这一行:
List<ChunkDTO> listDelayedChunkDTO = mapListDelayedChunkDTO.putIfAbsent(chunkDTO.getPitch(), new ArrayList<>());
putIfAbsent() 的文档说:
If the specified key is not already associated with a value (or is mapped to null) associates it with the given value and returns null, else returns the current value.
由于地图最初是空的,所以您第一次调用 putIfAbsent()
时,它 returns null
并将其分配给 listDelayedChunkDTO
。
然后你创建一个ProcessorDSP
对象:
ProcessorDSP processorDSP = new ProcessorDSP(controlDSP, upNodeDSP, null,
dHnCoefficients, chunkDTO, listDelayedChunkDTO, Arrays.asList(parent.getParentBlockingQueue()));
这意味着你将null
作为listDelayedChunkDTO
参数传递。所以当这一行在 ProcessorDSP
:
if (listDelayedChunkDTO.size() > 2) {
它抛出一个 NullPointerException
并且 runnable 停止。