如何以正确的顺序执行 Runnable/Thread

How to execute Runnable/Thread in correct sequence

想象这样一个数据流

A,A,B,A,C,C,C,A,B,A,A,A,B...

现在假设我们有一个 StreamProcessor 来处理流。我们可以并行处理 A、B、C,但必须依次处理单个 A、B、C。

示例:
线程 1:按顺序处理所有 As
线程2:依次处理所有的B 等等...

所以对于 A、B、C,我有一个 StreamProcessor (SP)。

每个流元素都有一个时间戳,因此可以按时间排序(它实际上以正确的顺序出现)。元素必须按时间顺序处理。

所以现在我将所有流元素拆分到它们的处理器(SPA、SPB、SPC)。

我在添加元素的所有 SP 中都有一个 TreeSet。

所以每当有新元素出现时,我基本上都会这样做:

 public synchronized void onNewElementReceived(Element element) {
        if (element== null) return;
        treeSet.add(element);
        if(treeSet.size()>30) logger.warn("There are many elements queueing up for processing");
        threadPool.execute(() -> process(treeSet.first()));
    }

private synchronized void process(Element element){
    //Do the processing
}

如果流足够慢以致进程在存在下一个元素之前终止,则此方法工作正常。但如果不是呢?如果有更多元素出现,我如何确保下一个元素也是下一个要处理的元素?到底是操作系统决定什么时候触发哪个Thread?

编辑: 为清楚起见,举一个失败的例子:

假设 process() 个 A 元素需要 1 秒来执行。现在,如果流提供 As 速度更快,那么我们可以处理它们,我们的 treeSet 将填充 A 类型的元素(我刚刚意识到它不是因为我们立即再次获取它,嗯另一个问题)无论如何主要问题仍然存在。例如,如果我们每 100 毫秒接收一次元素,我们将请求 process 方法执行 10 次,但顺序将不再得到保证,因为我们不知道系统将首先执行哪个 Runnable。我们只是以正确的顺序添加它们,但如何以正确的顺序执行它们?

我可以想象只有 运行 一个循环线程一直在获取队列的第一个元素,如果有 none 则中止该过程。这是一个好方法吗?

我会这样做(类似伪代码):

abstract class StreamProcessor extends Thread{
  private ThreadSafeList<Element> elements;
  void add(Element e) {
    elements.addAtEnd(e);
  }
  @Override
  public void run() {
    while(hasNotFinished()) {
      //If list has element, return the first element and remove it from the list, otherwise block until one is there and then return the first element and remove it.
      Element e = elements.blockingRemoveFirst();
      this.workWith(e);
    }
  }
  abstract void workWith(Element e);
}

class StreamProcessorA extends StreamProcessor {
  @Override
  public void workWith(Element e) {
    //Do something
  }
}
class StreamProcessorB extends StreamProcessor {
  @Override
  public void workWith(Element e) {
    //Do something
  }
}
class StreamProcessorC extends StreamProcessor {
  @Override
  public void workWith(Element e) {
    //Do something
  }
}

class ElementReceiver {
  private StreamProcessor A;
  private StreamProcessor B;
  private StreamProcessor C;

  public synchronized void onNewElementReceived(Element e) {
    if(e.type() /*Whatever*/ == ElementType.A) {
       A.add(e);
    }else if(e.type() == ElementType.B) {
       B.add(e);
    }else {
       C.add(e);
    }
  }
}

这段代码由四个线程组成。

第一个线程从某个未指定的数据源接收元素。

如果此线程收到一个,它会检查它是什么类型(A、B 或 C)。

这些类型中的每一种都有对应的 StreamProcessor。 onNewElementReceived 会将接收到的元素添加到对应的 StreamProcessor 的工作集中。

这些 StreamProcessor 线程中的每一个都会进行检查,直到它们被杀死并阻塞,直到它获得一个元素,然后调用必须由每个子类实现的方法 workWith