如何检查 Java 8 Stream.forEach() 何时完成迭代?
How can you check when a Java 8 Stream.forEach() finishes iterating?
我想使用 Java 8 Stream
提供的并行性,但我还需要按特定顺序执行某些操作,否则一切都会中断。问题是使用流意味着代码现在是异步的 [注意:这是不正确的],而且我无法弄清楚如何让事情发生,只有当它完成对整个集合的迭代时。 [注意:这是自动发生的]
现在,我的代码如下:
public void iterateOverMap(Map<String, String> m)
{
AtomicInteger count = new AtomicInteger(0);
m.keySet().stream().forEach((k) -> {
Object o = m.get(k);
// do stuff with o
count.incrementAndGet();
});
// spin up a new thread to check if the Stream is done
new Thread(() -> {
for (;;)
{
if (count.intValue() >= map.size())
break;
}
afterFinishedIterating();
}).start();
}
我不喜欢为了跟踪这件事而不得不启动一个新线程或阻塞主线程的想法,但我想不出我还能怎么做。有人知道更好的选择吗?
谢谢!
Stream
处理是同步的。
如果您想要一个如何跟踪 Stream
进度的示例,您可以使用 peek()
中间操作,但请记住它最好用于调试目的
例子取自
Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() + " elements on " + elementsCount + " treated");
System.out.println((5*(loader.get() / fivePercent)) + "%");
}
})
.reduce(MyStat::aggregate);
原来我遇到的问题是我使用的线程安全但不确定的迭代“Collections.synchronizedList(new LinkedList<>())
”列表,而不是 Stream
用法。 Stream
并不像我假设的那样是异步的,所以答案只是“您不必这样做;它会为您完成”。
我想使用 Java 8 Stream
提供的并行性,但我还需要按特定顺序执行某些操作,否则一切都会中断。问题是使用流意味着代码现在是异步的 [注意:这是不正确的],而且我无法弄清楚如何让事情发生,只有当它完成对整个集合的迭代时。 [注意:这是自动发生的]
现在,我的代码如下:
public void iterateOverMap(Map<String, String> m)
{
AtomicInteger count = new AtomicInteger(0);
m.keySet().stream().forEach((k) -> {
Object o = m.get(k);
// do stuff with o
count.incrementAndGet();
});
// spin up a new thread to check if the Stream is done
new Thread(() -> {
for (;;)
{
if (count.intValue() >= map.size())
break;
}
afterFinishedIterating();
}).start();
}
我不喜欢为了跟踪这件事而不得不启动一个新线程或阻塞主线程的想法,但我想不出我还能怎么做。有人知道更好的选择吗?
谢谢!
Stream
处理是同步的。
如果您想要一个如何跟踪 Stream
进度的示例,您可以使用 peek()
中间操作,但请记住它最好用于调试目的
例子取自
Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() + " elements on " + elementsCount + " treated");
System.out.println((5*(loader.get() / fivePercent)) + "%");
}
})
.reduce(MyStat::aggregate);
原来我遇到的问题是我使用的线程安全但不确定的迭代“Collections.synchronizedList(new LinkedList<>())
”列表,而不是 Stream
用法。 Stream
并不像我假设的那样是异步的,所以答案只是“您不必这样做;它会为您完成”。