并发方法设计的数据类型
Data types for a concurrent method design
假设我有一个集合 input
,其中包含大约。 100000 个对象。
有一个worker线程池,每个线程池
- 取该集合的一个元素,
- 做一些计算并且
- 有时(在大约 10% 到最多 50% 的情况下,即每个 运行 10000 到 50000 次之间)将它们的结果添加到
output
集合中。
处理完 input
中的所有项目后,另一个例程接受 output
并用它做一些事情。
我需要线程池以便尽快处理 input
。 input
的每个元素都应该恰好处理一次。
处理元素的顺序无关紧要(对于 input
和 output
)。 output
是只写的——工作人员只会在那里写,不会对 output
.
做任何其他操作
问题有两部分,其中线程安全很重要:
- 工作线程需要确保当工作线程 A 处理
input
的某个元素时,其他工作线程会注意到它并且不会处理相同的元素。
- 工作人员完成对元素的处理后,应将结果添加到
output
集合中。
问题:
- 我可以安全地将哪种集合类型用于
input
集合(ConcurrentLinkedQueue?)?
- 我可以对
output
集合使用正常的 LinkedList
吗(如果 2 个线程同时尝试向列表中添加不同的对象,是否会出现其中一个对象不添加的情况?得救)?
CLQ 适用于给定您的约束的输入,在轮询 size() 以检查输入的终止时要小心:如文档中所述,它不是恒定时间操作。
对于输出,我怀疑 LinkedList 是线程安全的,即使只是为了添加。添加意味着改变头节点的状态,如果两个线程同时添加,这可能会产生问题和分离元素。
您可以使用另一个 CLQ 或 LinkedBlockingDeque 。还有一个更简单的SynchronizedLinkedList。
你没有提到 Java 8,但这是新的 Java 8 并行流库的经典应用:
Collection<Item> input = ... ;
List<Result> output = input.parallelStream()
.map(Item::computeResult)
.filter(Result::matches)
.collect(Collectors.toList());
结果列表是 ArrayList
(尽管这在未来可能会改变)。即使多个线程正在执行处理,这仍然有效,但 ArrayList
不是 thread-safe。这是如何工作的?
它起作用的原因是每个线程都将结果插入到它自己的包含中间结果的列表实例中。最后,中间结果被组合成一个输出列表。这避免了多个线程同时将结果写入输出列表时可能发生的潜在争用。
这里使用的原则是thread-confinement(Goetz,第 3.3 节)。在 multi-threaded 环境中使用非 thread-safe 数据结构是安全的,只要一次 一个线程 可以访问它,并且数据在线程之间安全传递。
如果您未使用 Java 8,则可以使用 fork-join 框架(请参阅 Lea)对 multi-threading 使用一些相同的技术在 Java 7 中引入。它不如 Java 8 Streams 方便。 (实际上,Java 8 Streams 是建立在 Fork/Join 框架之上的。)当然,您将不得不做更多的工作,并且您将无法获得 lambda 的便利。但它确实提供了一种相当方便的方法来构建易于拆分的计算。
关键是构建您的计算,以便块可以表示为 RecursiveTask
。通常,递归任务包含对输入数据结构的引用、数组或列表索引的范围以及存储中间结果的位置。通过拆分索引范围,可以轻松拆分 ("forked") 一个任务。每个分叉任务加入后,其中间结果可以与该任务的中间结果合并。这是以 thread-confined 方式完成的(join 操作处理线程之间的正确切换)。此外,组合阶段也是并行发生的,因为不同线程组合来自计算树不同部分的结果都可以并行进行。
参考文献
Goetz 等。阿尔。 Java 并发实践。 版权所有 2006 Pearson Education
莉亚,道格。 A Java Fork/Join 框架。 Java Grande 上的 ACM 2000 会议论文集。 http://gee.cs.oswego.edu/dl/papers/fj.pdf
假设我有一个集合 input
,其中包含大约。 100000 个对象。
有一个worker线程池,每个线程池
- 取该集合的一个元素,
- 做一些计算并且
- 有时(在大约 10% 到最多 50% 的情况下,即每个 运行 10000 到 50000 次之间)将它们的结果添加到
output
集合中。
处理完 input
中的所有项目后,另一个例程接受 output
并用它做一些事情。
我需要线程池以便尽快处理 input
。 input
的每个元素都应该恰好处理一次。
处理元素的顺序无关紧要(对于 input
和 output
)。 output
是只写的——工作人员只会在那里写,不会对 output
.
问题有两部分,其中线程安全很重要:
- 工作线程需要确保当工作线程 A 处理
input
的某个元素时,其他工作线程会注意到它并且不会处理相同的元素。 - 工作人员完成对元素的处理后,应将结果添加到
output
集合中。
问题:
- 我可以安全地将哪种集合类型用于
input
集合(ConcurrentLinkedQueue?)? - 我可以对
output
集合使用正常的LinkedList
吗(如果 2 个线程同时尝试向列表中添加不同的对象,是否会出现其中一个对象不添加的情况?得救)?
CLQ 适用于给定您的约束的输入,在轮询 size() 以检查输入的终止时要小心:如文档中所述,它不是恒定时间操作。
对于输出,我怀疑 LinkedList 是线程安全的,即使只是为了添加。添加意味着改变头节点的状态,如果两个线程同时添加,这可能会产生问题和分离元素。
您可以使用另一个 CLQ 或 LinkedBlockingDeque 。还有一个更简单的SynchronizedLinkedList。
你没有提到 Java 8,但这是新的 Java 8 并行流库的经典应用:
Collection<Item> input = ... ;
List<Result> output = input.parallelStream()
.map(Item::computeResult)
.filter(Result::matches)
.collect(Collectors.toList());
结果列表是 ArrayList
(尽管这在未来可能会改变)。即使多个线程正在执行处理,这仍然有效,但 ArrayList
不是 thread-safe。这是如何工作的?
它起作用的原因是每个线程都将结果插入到它自己的包含中间结果的列表实例中。最后,中间结果被组合成一个输出列表。这避免了多个线程同时将结果写入输出列表时可能发生的潜在争用。
这里使用的原则是thread-confinement(Goetz,第 3.3 节)。在 multi-threaded 环境中使用非 thread-safe 数据结构是安全的,只要一次 一个线程 可以访问它,并且数据在线程之间安全传递。
如果您未使用 Java 8,则可以使用 fork-join 框架(请参阅 Lea)对 multi-threading 使用一些相同的技术在 Java 7 中引入。它不如 Java 8 Streams 方便。 (实际上,Java 8 Streams 是建立在 Fork/Join 框架之上的。)当然,您将不得不做更多的工作,并且您将无法获得 lambda 的便利。但它确实提供了一种相当方便的方法来构建易于拆分的计算。
关键是构建您的计算,以便块可以表示为 RecursiveTask
。通常,递归任务包含对输入数据结构的引用、数组或列表索引的范围以及存储中间结果的位置。通过拆分索引范围,可以轻松拆分 ("forked") 一个任务。每个分叉任务加入后,其中间结果可以与该任务的中间结果合并。这是以 thread-confined 方式完成的(join 操作处理线程之间的正确切换)。此外,组合阶段也是并行发生的,因为不同线程组合来自计算树不同部分的结果都可以并行进行。
参考文献
Goetz 等。阿尔。 Java 并发实践。 版权所有 2006 Pearson Education
莉亚,道格。 A Java Fork/Join 框架。 Java Grande 上的 ACM 2000 会议论文集。 http://gee.cs.oswego.edu/dl/papers/fj.pdf