如何以正确的顺序执行 Runnable/Thread
How to execute Runnable/Thread in correct sequence
想象这样一个数据流
A,A,B,A,C,C,C,A,B,A,A,A,B...
现在假设我们有一个 StreamProcessor
来处理流。我们可以并行处理 A、B、C,但必须依次处理单个 A、B、C。
示例:
线程 1:按顺序处理所有 As
线程2:依次处理所有的B
等等...
所以对于 A、B、C,我有一个 StreamProcessor (SP)。
每个流元素都有一个时间戳,因此可以按时间排序(它实际上以正确的顺序出现)。元素必须按时间顺序处理。
所以现在我将所有流元素拆分到它们的处理器(SPA、SPB、SPC)。
我在添加元素的所有 SP 中都有一个 TreeSet。
所以每当有新元素出现时,我基本上都会这样做:
public synchronized void onNewElementReceived(Element element) {
if (element== null) return;
treeSet.add(element);
if(treeSet.size()>30) logger.warn("There are many elements queueing up for processing");
threadPool.execute(() -> process(treeSet.first()));
}
private synchronized void process(Element element){
//Do the processing
}
如果流足够慢以致进程在存在下一个元素之前终止,则此方法工作正常。但如果不是呢?如果有更多元素出现,我如何确保下一个元素也是下一个要处理的元素?到底是操作系统决定什么时候触发哪个Thread?
编辑: 为清楚起见,举一个失败的例子:
假设 process()
个 A 元素需要 1 秒来执行。现在,如果流提供 As 速度更快,那么我们可以处理它们,我们的 treeSet 将填充 A 类型的元素(我刚刚意识到它不是因为我们立即再次获取它,嗯另一个问题)无论如何主要问题仍然存在。例如,如果我们每 100 毫秒接收一次元素,我们将请求 process
方法执行 10 次,但顺序将不再得到保证,因为我们不知道系统将首先执行哪个 Runnable。我们只是以正确的顺序添加它们,但如何以正确的顺序执行它们?
我可以想象只有 运行 一个循环线程一直在获取队列的第一个元素,如果有 none 则中止该过程。这是一个好方法吗?
我会这样做(类似伪代码):
abstract class StreamProcessor extends Thread{
private ThreadSafeList<Element> elements;
void add(Element e) {
elements.addAtEnd(e);
}
@Override
public void run() {
while(hasNotFinished()) {
//If list has element, return the first element and remove it from the list, otherwise block until one is there and then return the first element and remove it.
Element e = elements.blockingRemoveFirst();
this.workWith(e);
}
}
abstract void workWith(Element e);
}
class StreamProcessorA extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class StreamProcessorB extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class StreamProcessorC extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class ElementReceiver {
private StreamProcessor A;
private StreamProcessor B;
private StreamProcessor C;
public synchronized void onNewElementReceived(Element e) {
if(e.type() /*Whatever*/ == ElementType.A) {
A.add(e);
}else if(e.type() == ElementType.B) {
B.add(e);
}else {
C.add(e);
}
}
}
这段代码由四个线程组成。
第一个线程从某个未指定的数据源接收元素。
如果此线程收到一个,它会检查它是什么类型(A、B 或 C)。
这些类型中的每一种都有对应的 StreamProcessor。 onNewElementReceived 会将接收到的元素添加到对应的 StreamProcessor 的工作集中。
这些 StreamProcessor 线程中的每一个都会进行检查,直到它们被杀死并阻塞,直到它获得一个元素,然后调用必须由每个子类实现的方法 workWith
。
想象这样一个数据流
A,A,B,A,C,C,C,A,B,A,A,A,B...
现在假设我们有一个 StreamProcessor
来处理流。我们可以并行处理 A、B、C,但必须依次处理单个 A、B、C。
示例:
线程 1:按顺序处理所有 As
线程2:依次处理所有的B
等等...
所以对于 A、B、C,我有一个 StreamProcessor (SP)。
每个流元素都有一个时间戳,因此可以按时间排序(它实际上以正确的顺序出现)。元素必须按时间顺序处理。
所以现在我将所有流元素拆分到它们的处理器(SPA、SPB、SPC)。
我在添加元素的所有 SP 中都有一个 TreeSet。
所以每当有新元素出现时,我基本上都会这样做:
public synchronized void onNewElementReceived(Element element) {
if (element== null) return;
treeSet.add(element);
if(treeSet.size()>30) logger.warn("There are many elements queueing up for processing");
threadPool.execute(() -> process(treeSet.first()));
}
private synchronized void process(Element element){
//Do the processing
}
如果流足够慢以致进程在存在下一个元素之前终止,则此方法工作正常。但如果不是呢?如果有更多元素出现,我如何确保下一个元素也是下一个要处理的元素?到底是操作系统决定什么时候触发哪个Thread?
编辑: 为清楚起见,举一个失败的例子:
假设 process()
个 A 元素需要 1 秒来执行。现在,如果流提供 As 速度更快,那么我们可以处理它们,我们的 treeSet 将填充 A 类型的元素(我刚刚意识到它不是因为我们立即再次获取它,嗯另一个问题)无论如何主要问题仍然存在。例如,如果我们每 100 毫秒接收一次元素,我们将请求 process
方法执行 10 次,但顺序将不再得到保证,因为我们不知道系统将首先执行哪个 Runnable。我们只是以正确的顺序添加它们,但如何以正确的顺序执行它们?
我可以想象只有 运行 一个循环线程一直在获取队列的第一个元素,如果有 none 则中止该过程。这是一个好方法吗?
我会这样做(类似伪代码):
abstract class StreamProcessor extends Thread{
private ThreadSafeList<Element> elements;
void add(Element e) {
elements.addAtEnd(e);
}
@Override
public void run() {
while(hasNotFinished()) {
//If list has element, return the first element and remove it from the list, otherwise block until one is there and then return the first element and remove it.
Element e = elements.blockingRemoveFirst();
this.workWith(e);
}
}
abstract void workWith(Element e);
}
class StreamProcessorA extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class StreamProcessorB extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class StreamProcessorC extends StreamProcessor {
@Override
public void workWith(Element e) {
//Do something
}
}
class ElementReceiver {
private StreamProcessor A;
private StreamProcessor B;
private StreamProcessor C;
public synchronized void onNewElementReceived(Element e) {
if(e.type() /*Whatever*/ == ElementType.A) {
A.add(e);
}else if(e.type() == ElementType.B) {
B.add(e);
}else {
C.add(e);
}
}
}
这段代码由四个线程组成。
第一个线程从某个未指定的数据源接收元素。
如果此线程收到一个,它会检查它是什么类型(A、B 或 C)。
这些类型中的每一种都有对应的 StreamProcessor。 onNewElementReceived 会将接收到的元素添加到对应的 StreamProcessor 的工作集中。
这些 StreamProcessor 线程中的每一个都会进行检查,直到它们被杀死并阻塞,直到它获得一个元素,然后调用必须由每个子类实现的方法 workWith
。