如何控制子线程生命周期并与主线程同步?
How to control child-thread lifecycle and synchonize it with main-thread?
我正在尝试创建另一个线程来处理数据,同时主线程做更多的事情。主线程必须等到另一个线程完成 doStuff
的所有元素。
我的实现非常简单。
请看一下 processData
并告诉我是否还有其他类似 Java 的方法?
我读到了 Phaser
,但仍然无法想象如何使用它或我还能尝试什么?
public class MyClass {
private final NodeQueue queue;
MyClass() {
queue = new NodeQueue();
}
public void processData(Set<String> dataSet) {
// allow transfer
queue.transferEnable()
Thread transfer = new Thread(() -> {
queue.transferData();
})
transfer.start();
// doStuff in another thread
for (String element : dataSet) {
queue.add(element);
// do something more
}
// stop transfer
queue.waitTillEmptyQueue();
queue.transferDisable();
try {
transfer.join();
} catch (...) {
// catch
}
}
public class NodeQueue {
private final ConcurrentLinkedQueue<String> queue;
private boolean transferEnabled;
protected NodeQueue() {
queue = new ConcurrentLinkedQueue<>();
transferEnabled = true;
}
protected void transfer() {
while (!queue.isEmpty()) {
doStuff(queue.poll());
}
}
public void transferData() {
while (tranfserEnabled) {
transfer();
}
}
public synchronized void transferEnable() {
transferEnabled = true;
}
public synchronized void transferDisable() {
transferEnabled = false;
}
public void add(String s) {
queue.add(s);
}
public synchronized void waitTillEmptyQueue() {
while (!queue.isEmpty()) {
if (queue.isEmpty()) {
break;
}
}
}
}
}
让我复制 my own post
中的 Phaser
示例
主线程
// Add producer as a party
Phaser phaser = new Phaser(1);
for (int i=0; i<10000; ++i) {
// Add each task as a party
phaser.register();
queue.put(new Task());
}
// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();
// At the end, there is only 1 party left which is the producer itself
消费者
while (true) {
Task task = queue.take();
processTask(task);
// Task completed and remove itself as a party
phaser.arriveAndDeregister();
}
我正在尝试创建另一个线程来处理数据,同时主线程做更多的事情。主线程必须等到另一个线程完成 doStuff
的所有元素。
我的实现非常简单。
请看一下 processData
并告诉我是否还有其他类似 Java 的方法?
我读到了 Phaser
,但仍然无法想象如何使用它或我还能尝试什么?
public class MyClass {
private final NodeQueue queue;
MyClass() {
queue = new NodeQueue();
}
public void processData(Set<String> dataSet) {
// allow transfer
queue.transferEnable()
Thread transfer = new Thread(() -> {
queue.transferData();
})
transfer.start();
// doStuff in another thread
for (String element : dataSet) {
queue.add(element);
// do something more
}
// stop transfer
queue.waitTillEmptyQueue();
queue.transferDisable();
try {
transfer.join();
} catch (...) {
// catch
}
}
public class NodeQueue {
private final ConcurrentLinkedQueue<String> queue;
private boolean transferEnabled;
protected NodeQueue() {
queue = new ConcurrentLinkedQueue<>();
transferEnabled = true;
}
protected void transfer() {
while (!queue.isEmpty()) {
doStuff(queue.poll());
}
}
public void transferData() {
while (tranfserEnabled) {
transfer();
}
}
public synchronized void transferEnable() {
transferEnabled = true;
}
public synchronized void transferDisable() {
transferEnabled = false;
}
public void add(String s) {
queue.add(s);
}
public synchronized void waitTillEmptyQueue() {
while (!queue.isEmpty()) {
if (queue.isEmpty()) {
break;
}
}
}
}
}
让我复制 my own post
中的Phaser
示例
主线程
// Add producer as a party
Phaser phaser = new Phaser(1);
for (int i=0; i<10000; ++i) {
// Add each task as a party
phaser.register();
queue.put(new Task());
}
// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();
// At the end, there is only 1 party left which is the producer itself
消费者
while (true) {
Task task = queue.take();
processTask(task);
// Task completed and remove itself as a party
phaser.arriveAndDeregister();
}