您可以重新平衡大小未知的不平衡 Spliterator 吗?
Can you rebalance an unbalanced Spliterator of unknown size?
我想使用 Stream
来并行处理远程存储的一组异类 JSON 未知数量的文件(文件数量事先未知)。这些文件的大小可能相差很大,从每个文件 1 JSON 条记录到某些其他文件中的 100,000 条记录不等。 JSON 记录 在这种情况下表示一个自包含的 JSON 对象,在文件中表示为一行。
我真的很想为此使用 Streams,所以我实现了这个 Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
我遇到的问题是,虽然 Stream 一开始并行化得很好,但最终最大的文件会在单个线程中处理。我相信近端原因有据可查:拆分器是 "unbalanced".
更具体地说,似乎 trySplit
方法在 Stream.forEach
生命周期的某个点之后没有被调用,因此在 [=13] 结束时分配小批量的额外逻辑=]很少执行。
注意从 trySplit 返回的所有拆分器如何共享同一个 paths
迭代器。我认为这是平衡所有拆分器之间工作的一种非常聪明的方法,但它还不足以实现完全并行。
我希望并行处理首先跨文件进行,然后当少数大文件仍处于分裂状态时,我想跨剩余文件的块并行处理。这就是 trySplit
末尾的 else
块的意图。
是否有简单/规范的方法解决这个问题?
您的 trySplit
应该输出大小相等的分割,无论基础文件的大小如何。您应该将所有文件视为一个单元,并每次用相同数量的 JSON 对象填充支持 ArrayList
的拆分器。对象的数量应该使得处理一个拆分需要 1 到 10 毫秒:低于 1 毫秒,您开始接近将批次移交给工作线程的成本,高于此,您开始冒着不均衡的风险 CPU 由于任务过于粗粒度而导致负载。
拆分器没有义务报告大小估计值,而您已经正确地做到了这一点:您的估计值是 Long.MAX_VALUE
,这是一个特殊值,意思是 "unbounded"。但是,如果您有许多文件都包含一个 JSON 对象,导致批处理大小为 1,这将在两个方面损害您的性能:打开-读取-关闭文件的开销可能成为瓶颈,并且,如果你设法避免了这一点,与处理一个项目的成本相比,线程切换的成本可能是显着的,再次造成瓶颈。
五年前我在解决类似的问题,你可以看看my solution。
经过多次试验,我仍然无法通过调整大小估计来获得任何额外的并行性。基本上,除 Long.MAX_VALUE
之外的任何值都会导致拆分器过早终止(并且没有任何拆分),而另一方面 Long.MAX_VALUE
估计将导致 trySplit
被调用坚持不懈,直到 returns null
.
我找到的解决方案是在拆分器之间共享资源,让它们重新平衡。
工作代码:
public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {
public final static class AwsS3LineInput<LINE> {
final public S3ObjectSummary s3ObjectSummary;
final public LINE lineItem;
public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
this.s3ObjectSummary = s3ObjectSummary;
this.lineItem = lineItem;
}
}
private final class InputStreamHandler {
final S3ObjectSummary file;
final InputStream inputStream;
InputStreamHandler(S3ObjectSummary file, InputStream is) {
this.file = file;
this.inputStream = is;
}
}
private final Iterator<S3ObjectSummary> incomingFiles;
private final Function<S3ObjectSummary, InputStream> fileOpener;
private final Function<InputStream, LINE> lineReader;
private final Deque<S3ObjectSummary> unopenedFiles;
private final Deque<InputStreamHandler> openedFiles;
private final Deque<AwsS3LineInput<LINE>> sharedBuffer;
private final int maxBuffer;
private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
Function<InputStream, LINE> lineReader,
Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
int maxBuffer) {
super(Long.MAX_VALUE, 0);
this.incomingFiles = incomingFiles;
this.fileOpener = fileOpener;
this.lineReader = lineReader;
this.unopenedFiles = unopenedFiles;
this.openedFiles = openedFiles;
this.sharedBuffer = sharedBuffer;
this.maxBuffer = maxBuffer;
}
public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
}
@Override
public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
AwsS3LineInput<LINE> lineInput;
synchronized(sharedBuffer) {
lineInput=sharedBuffer.poll();
}
if(lineInput != null) {
action.accept(lineInput);
return true;
}
InputStreamHandler handle = openedFiles.poll();
if(handle == null) {
S3ObjectSummary unopenedFile = unopenedFiles.poll();
if(unopenedFile == null) {
return false;
}
handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
}
for(int i=0; i < maxBuffer; ++i) {
LINE line = lineReader.apply(handle.inputStream);
if(line != null) {
synchronized(sharedBuffer) {
sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
}
}
else {
return tryAdvance(action);
}
}
openedFiles.addFirst(handle);
return tryAdvance(action);
}
@Override
public Spliterator<AwsS3LineInput<LINE>> trySplit() {
synchronized(incomingFiles) {
if (incomingFiles.hasNext()) {
unopenedFiles.add(incomingFiles.next());
return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
} else {
return null;
}
}
}
}
这不是对您问题的直接回答。但我认为值得一试 Stream
in library abacus-common:
void test_58601518() throws Exception {
final File tempDir = new File("./temp/");
// Prepare the test files:
// if (!(tempDir.exists() && tempDir.isDirectory())) {
// tempDir.mkdirs();
// }
//
// final Random rand = new Random();
// final int fileCount = 1000;
//
// for (int i = 0; i < fileCount; i++) {
// List<String> lines = Stream.repeat(TestUtil.fill(Account.class), rand.nextInt(1000) * 100 + 1).map(it -> N.toJSON(it)).toList();
// IOUtil.writeLines(new File("./temp/_" + i + ".json"), lines);
// }
N.println("Xmx: " + IOUtil.MAX_MEMORY_IN_MB + " MB");
N.println("total file size: " + Stream.listFiles(tempDir).mapToLong(IOUtil::sizeOf).sum() / IOUtil.ONE_MB + " MB");
final AtomicLong counter = new AtomicLong();
final Consumer<Account> yourAction = it -> {
counter.incrementAndGet();
it.toString().replace("a", "bbb");
};
long startTime = System.currentTimeMillis();
Stream.listFiles(tempDir) // the file/data source could be local file system or remote file system.
.parallel(2) // thread number used to load the file/data and convert the lines to Java objects.
.flatMap(f -> Stream.lines(f).map(line -> N.fromJSON(Account.class, line))) // only certain lines (less 1024) will be loaded to memory.
.parallel(8) // thread number used to execute your action.
.forEach(yourAction);
N.println("Took: " + ((System.currentTimeMillis()) - startTime) + " ms" + " to process " + counter + " lines/objects");
// IOUtil.deleteAllIfExists(tempDir);
}
直到结束,CPU 在我的笔记本电脑上的使用率相当高(大约 70%),使用 Intel(R) Core 处理 1000 个文件中的 51,899,100 lines/objects 花费了大约 70 秒(TM) i5-8365U CPU 和 Xmx256m jvm 内存。总文件大小约为:4524 MB。如果 yourAction
不是繁重的操作,顺序流可能比并行流更快。
F.Y.I 我是 abacus-common
的开发者
我想使用 Stream
来并行处理远程存储的一组异类 JSON 未知数量的文件(文件数量事先未知)。这些文件的大小可能相差很大,从每个文件 1 JSON 条记录到某些其他文件中的 100,000 条记录不等。 JSON 记录 在这种情况下表示一个自包含的 JSON 对象,在文件中表示为一行。
我真的很想为此使用 Streams,所以我实现了这个 Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
我遇到的问题是,虽然 Stream 一开始并行化得很好,但最终最大的文件会在单个线程中处理。我相信近端原因有据可查:拆分器是 "unbalanced".
更具体地说,似乎 trySplit
方法在 Stream.forEach
生命周期的某个点之后没有被调用,因此在 [=13] 结束时分配小批量的额外逻辑=]很少执行。
注意从 trySplit 返回的所有拆分器如何共享同一个 paths
迭代器。我认为这是平衡所有拆分器之间工作的一种非常聪明的方法,但它还不足以实现完全并行。
我希望并行处理首先跨文件进行,然后当少数大文件仍处于分裂状态时,我想跨剩余文件的块并行处理。这就是 trySplit
末尾的 else
块的意图。
是否有简单/规范的方法解决这个问题?
您的 trySplit
应该输出大小相等的分割,无论基础文件的大小如何。您应该将所有文件视为一个单元,并每次用相同数量的 JSON 对象填充支持 ArrayList
的拆分器。对象的数量应该使得处理一个拆分需要 1 到 10 毫秒:低于 1 毫秒,您开始接近将批次移交给工作线程的成本,高于此,您开始冒着不均衡的风险 CPU 由于任务过于粗粒度而导致负载。
拆分器没有义务报告大小估计值,而您已经正确地做到了这一点:您的估计值是 Long.MAX_VALUE
,这是一个特殊值,意思是 "unbounded"。但是,如果您有许多文件都包含一个 JSON 对象,导致批处理大小为 1,这将在两个方面损害您的性能:打开-读取-关闭文件的开销可能成为瓶颈,并且,如果你设法避免了这一点,与处理一个项目的成本相比,线程切换的成本可能是显着的,再次造成瓶颈。
五年前我在解决类似的问题,你可以看看my solution。
经过多次试验,我仍然无法通过调整大小估计来获得任何额外的并行性。基本上,除 Long.MAX_VALUE
之外的任何值都会导致拆分器过早终止(并且没有任何拆分),而另一方面 Long.MAX_VALUE
估计将导致 trySplit
被调用坚持不懈,直到 returns null
.
我找到的解决方案是在拆分器之间共享资源,让它们重新平衡。
工作代码:
public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {
public final static class AwsS3LineInput<LINE> {
final public S3ObjectSummary s3ObjectSummary;
final public LINE lineItem;
public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
this.s3ObjectSummary = s3ObjectSummary;
this.lineItem = lineItem;
}
}
private final class InputStreamHandler {
final S3ObjectSummary file;
final InputStream inputStream;
InputStreamHandler(S3ObjectSummary file, InputStream is) {
this.file = file;
this.inputStream = is;
}
}
private final Iterator<S3ObjectSummary> incomingFiles;
private final Function<S3ObjectSummary, InputStream> fileOpener;
private final Function<InputStream, LINE> lineReader;
private final Deque<S3ObjectSummary> unopenedFiles;
private final Deque<InputStreamHandler> openedFiles;
private final Deque<AwsS3LineInput<LINE>> sharedBuffer;
private final int maxBuffer;
private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
Function<InputStream, LINE> lineReader,
Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
int maxBuffer) {
super(Long.MAX_VALUE, 0);
this.incomingFiles = incomingFiles;
this.fileOpener = fileOpener;
this.lineReader = lineReader;
this.unopenedFiles = unopenedFiles;
this.openedFiles = openedFiles;
this.sharedBuffer = sharedBuffer;
this.maxBuffer = maxBuffer;
}
public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
}
@Override
public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
AwsS3LineInput<LINE> lineInput;
synchronized(sharedBuffer) {
lineInput=sharedBuffer.poll();
}
if(lineInput != null) {
action.accept(lineInput);
return true;
}
InputStreamHandler handle = openedFiles.poll();
if(handle == null) {
S3ObjectSummary unopenedFile = unopenedFiles.poll();
if(unopenedFile == null) {
return false;
}
handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
}
for(int i=0; i < maxBuffer; ++i) {
LINE line = lineReader.apply(handle.inputStream);
if(line != null) {
synchronized(sharedBuffer) {
sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
}
}
else {
return tryAdvance(action);
}
}
openedFiles.addFirst(handle);
return tryAdvance(action);
}
@Override
public Spliterator<AwsS3LineInput<LINE>> trySplit() {
synchronized(incomingFiles) {
if (incomingFiles.hasNext()) {
unopenedFiles.add(incomingFiles.next());
return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
} else {
return null;
}
}
}
}
这不是对您问题的直接回答。但我认为值得一试 Stream
in library abacus-common:
void test_58601518() throws Exception {
final File tempDir = new File("./temp/");
// Prepare the test files:
// if (!(tempDir.exists() && tempDir.isDirectory())) {
// tempDir.mkdirs();
// }
//
// final Random rand = new Random();
// final int fileCount = 1000;
//
// for (int i = 0; i < fileCount; i++) {
// List<String> lines = Stream.repeat(TestUtil.fill(Account.class), rand.nextInt(1000) * 100 + 1).map(it -> N.toJSON(it)).toList();
// IOUtil.writeLines(new File("./temp/_" + i + ".json"), lines);
// }
N.println("Xmx: " + IOUtil.MAX_MEMORY_IN_MB + " MB");
N.println("total file size: " + Stream.listFiles(tempDir).mapToLong(IOUtil::sizeOf).sum() / IOUtil.ONE_MB + " MB");
final AtomicLong counter = new AtomicLong();
final Consumer<Account> yourAction = it -> {
counter.incrementAndGet();
it.toString().replace("a", "bbb");
};
long startTime = System.currentTimeMillis();
Stream.listFiles(tempDir) // the file/data source could be local file system or remote file system.
.parallel(2) // thread number used to load the file/data and convert the lines to Java objects.
.flatMap(f -> Stream.lines(f).map(line -> N.fromJSON(Account.class, line))) // only certain lines (less 1024) will be loaded to memory.
.parallel(8) // thread number used to execute your action.
.forEach(yourAction);
N.println("Took: " + ((System.currentTimeMillis()) - startTime) + " ms" + " to process " + counter + " lines/objects");
// IOUtil.deleteAllIfExists(tempDir);
}
直到结束,CPU 在我的笔记本电脑上的使用率相当高(大约 70%),使用 Intel(R) Core 处理 1000 个文件中的 51,899,100 lines/objects 花费了大约 70 秒(TM) i5-8365U CPU 和 Xmx256m jvm 内存。总文件大小约为:4524 MB。如果 yourAction
不是繁重的操作,顺序流可能比并行流更快。
F.Y.I 我是 abacus-common
的开发者