如何控制子线程生命周期并与主线程同步?

How to control child-thread lifecycle and synchonize it with main-thread?

我正在尝试创建另一个线程来处理数据,同时主线程做更多的事情。主线程必须等到另一个线程完成 doStuff 的所有元素。 我的实现非常简单。

请看一下 processData 并告诉我是否还有其他类似 Java 的方法?

我读到了 Phaser,但仍然无法想象如何使用它或我还能尝试什么?

public class MyClass {
    private final NodeQueue queue;
    
    MyClass() {
        queue = new NodeQueue();
    }
    
    public void processData(Set<String> dataSet) {
        // allow transfer
        queue.transferEnable()
        Thread transfer = new Thread(() -> {
            queue.transferData();
        })
        transfer.start();
        
        // doStuff in another thread
        for (String element : dataSet) {
            queue.add(element);
            // do something more
        }
        
        // stop transfer
        queue.waitTillEmptyQueue();
        queue.transferDisable();
        try {
            transfer.join();
        } catch (...) {
            // catch
        }
    }
    
    
    
    public class NodeQueue {
        private final ConcurrentLinkedQueue<String> queue;
        
        private boolean transferEnabled;
        
        protected NodeQueue() {
            queue = new ConcurrentLinkedQueue<>();
            transferEnabled = true;
        }
        
        protected void transfer() {
            while (!queue.isEmpty()) {
                doStuff(queue.poll());
            }
        }
        
        public void transferData() {
            while (tranfserEnabled) {
                transfer();
            }
        }
        
        public synchronized void transferEnable() {
            transferEnabled = true;
        }
        
        public synchronized void transferDisable() {
            transferEnabled = false;
        }
        
        public void add(String s) {
            queue.add(s);
        }
        
        public synchronized void waitTillEmptyQueue() {
            while (!queue.isEmpty()) {
                if (queue.isEmpty()) {
                    break;
                }
            }
        }
    }
}

让我复制 my own post

中的 Phaser 示例

主线程

// Add producer as a party
Phaser phaser = new Phaser(1);

for (int i=0; i<10000; ++i) {
  // Add each task as a party
  phaser.register();
  queue.put(new Task());
}

// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();

// At the end, there is only 1 party left which is the producer itself

消费者

while (true) {
  Task task = queue.take();
  processTask(task);
  // Task completed and remove itself as a party
  phaser.arriveAndDeregister();
}