BlockingQueue的put()和高并发
BlockingQueue's put() and high concurrency
我正在使用 Fork/Join 框架并行执行大量计算。
我已经使用一个简单的记录器实现了生产者-消费者模式,以在程序计算时创建控制台输出。生产者和消费者共享一个 BlockingQueue
,我使用 put()
方法,这样我就不会错过更新。
我发现在某些情况下性能很糟糕,VisualVM 告诉我 put()
方法是原因。
当将新消息放入 BlockingQueue
时出现问题时,我的 RecursiveTask
必须等待,但 ForkJoinPool
继续分叉新任务,因此有 2000-5000 个任务正在尝试访问put()
方法导致非常高的并发性。
有没有合适的方法来处理这种情况?
我想是这样的
if(!blockingQueue.offer(message) {
blockingQueue.put(message);
}
使用理论上无限的 BlockingQueue
。
时性能可能会更高
所以我的问题是:是否有一种适当且高效的方法可以在不丢失更新的情况下将对象放入 BlockingQueue
?
提前致谢!
如果您的池正在生成 2000-5000 个任务,那么这就是您的问题。一旦这么多任务开始,您将开始看到 BlocingQueue.put
中的线程争用,这将推高 put
.
的统计信息
使用 BlockingQueue
的全部意义在于,如果消费者比生产者慢(即使是暂时的),那么生产者将 阻塞 直到消费者赶上。这应该会导致上游进程也等待。如果这导致您的上游进程(大概是 FJP)使系统崩溃而不是仅仅阻塞,那将是一个问题。
我建议您使用固定容量的 FJP。
我正在使用 Fork/Join 框架并行执行大量计算。
我已经使用一个简单的记录器实现了生产者-消费者模式,以在程序计算时创建控制台输出。生产者和消费者共享一个 BlockingQueue
,我使用 put()
方法,这样我就不会错过更新。
我发现在某些情况下性能很糟糕,VisualVM 告诉我 put()
方法是原因。
当将新消息放入 BlockingQueue
时出现问题时,我的 RecursiveTask
必须等待,但 ForkJoinPool
继续分叉新任务,因此有 2000-5000 个任务正在尝试访问put()
方法导致非常高的并发性。
有没有合适的方法来处理这种情况?
我想是这样的
if(!blockingQueue.offer(message) {
blockingQueue.put(message);
}
使用理论上无限的 BlockingQueue
。
所以我的问题是:是否有一种适当且高效的方法可以在不丢失更新的情况下将对象放入 BlockingQueue
?
提前致谢!
如果您的池正在生成 2000-5000 个任务,那么这就是您的问题。一旦这么多任务开始,您将开始看到 BlocingQueue.put
中的线程争用,这将推高 put
.
使用 BlockingQueue
的全部意义在于,如果消费者比生产者慢(即使是暂时的),那么生产者将 阻塞 直到消费者赶上。这应该会导致上游进程也等待。如果这导致您的上游进程(大概是 FJP)使系统崩溃而不是仅仅阻塞,那将是一个问题。
我建议您使用固定容量的 FJP。