Java 8 批处理流
Java 8 Stream with batch processing
我有一个包含项目列表的大文件。
我想创建一批项目,用这批项目发出 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数)。我可以用 for
循环很容易地做到这一点,但作为 Java 8 爱好者,我想尝试用 Java 8 的 Stream 框架编写它(并获得延迟处理的好处)。
示例:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
我想做一些长期的事情
lazyFileStream.group(500).map(processBatch).collect(toList())
最好的方法是什么?
注意! 此解决方案在 运行 forEach 之前读取整个文件。
您可以使用 jOOλ 来完成,这是一个扩展 Java 8 个流的库,用于单线程、顺序流用例:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
在幕后,zipWithIndex()
只是:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... 而 groupBy()
是 API 方便:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(免责声明:我在 jOOλ 背后的公司工作)
纯 Java-8 实现也是可能的:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
请注意,与 JOOl 不同,它可以很好地并行工作(前提是您的 data
是随机访问列表)。
你也可以看看cyclops-react,我是这个库的作者。它实现了 jOOλ 接口(并通过扩展 JDK 8 Streams),但与 JDK 8 Parallel Streams 不同,它专注于异步操作(例如可能阻塞异步 I/O 调用)。 JDK Parallel Streams,相比之下,侧重于 CPU 绑定操作的数据并行性。它通过在后台管理基于 Future 的任务的聚合来工作,但向最终用户提供标准的扩展 Stream API。
此示例代码可以帮助您入门
LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
还有一个more general Tutorial here
要使用您自己的线程池(这可能更适合阻塞 I/O),您可以使用
开始处理
LazyReact reactor = new LazyReact(40);
reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
您也可以使用 RxJava:
RxJava v3:
int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
.buffer(batchSize)
.map(batch -> process(batch))
.blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));
以前的版本:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
或
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
或
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
为了完整起见,这里有一个 Guava 解决方案。
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
在问题中,集合可用,因此不需要流,可以写成,
Iterables.partition(data, batchSize).forEach(this::process);
纯Java8解:
我们可以创建一个自定义收集器来优雅地执行此操作,它接受一个 batch size
和一个 Consumer
来处理每个批次:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
然后可选择创建一个辅助实用程序 class:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
用法示例:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
我也在GitHub上发布了我的代码,如果有人想看的话:
我为这样的场景写了一个自定义的 Spliterator。它将从输入流中填充给定大小的列表。这种方式的优点是会进行惰性处理,并且会和其他流函数一起工作。
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
我们有一个类似的问题需要解决。我们想要采用大于系统内存的流(遍历数据库中的所有对象)并尽可能随机化顺序 - 我们认为缓冲 10,000 个项目并随机化它们是可以的。
目标是接收流的函数。
在此处提出的解决方案中,似乎有多种选择:
- 使用各种非java 8 个附加库
- 从不是流的内容开始 - 例如随机访问列表
- 有一个可以在拆分器中轻松拆分的流
我们最初的直觉是使用自定义收集器,但这意味着放弃流式处理。上面的自定义收集器解决方案很好,我们差点就用上了。
这是一个利用 Stream
s 可以给你一个 Iterator
的事实作弊的解决方案,你可以将其用作 逃生口 让你做了一些流不支持的额外事情。使用另一位 Java 8 StreamSupport
魔法将 Iterator
转换回流。
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
使用它的一个简单示例如下所示:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
以上打印
[A, B, C]
[D, E, F]
对于我们的用例,我们想要打乱批次,然后将它们作为流保存 - 它看起来像这样:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
这输出类似(它是随机的,每次都不同)
A
C
B
E
D
F
这里的秘诀在于始终存在一个流,因此您可以对批处理流进行操作,或者对每个批处理做一些事情,然后 flatMap
将其返回到流。更好的是,以上所有仅 运行s 作为最终的 forEach
或 collect
或其他终止表达式 PULL 通过流的数据。
事实证明,iterator
是一种特殊类型的终止操作,不会导致整个流运行并进入记忆!感谢 Java 8 个人的出色设计!
使用 Spliterator 的简单示例
// read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}
Bruce 的回答更全面,但我一直在寻找快速而肮脏的方法来处理一堆文件。
纯 Java 8 示例也适用于并行流。
使用方法:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
方法声明与实现:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);
stream.forEach(element -> {
List<ElementType> fullBatch;
synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}
batchProcessor.accept(fullBatch);
});
if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}
这是一个纯粹的 java 延迟计算的解决方案。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
使用 Java 8
和 com.google.common.collect.Lists
,您可以执行以下操作:
public class BatchProcessingUtil {
public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
List<List<T>> batches = Lists.partition(data, batchSize);
return batches.stream()
.map(processFunction) // Send each batch to the process function
.flatMap(Collection::stream) // flat results to gather them in 1 stream
.collect(Collectors.toList());
}
}
这里T
是输入列表中项目的类型,U
是输出列表中项目的类型
你可以这样使用它:
List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
userKeys,
10, // Batch Size
partialKeys -> service.getUsers(partialKeys)
);
您可以使用 apache.commons :
ListUtils.partition(ListOfLines, 500).stream()
.map(partition -> processBatch(partition)
.collect(Collectors.toList());
分区部分已完成 un-lazily 但在列表分区后,您将获得使用流的好处(例如使用并行流、添加过滤器等)。
其他答案提出了更详尽的解决方案,但有时可读性和可维护性更重要(有时它们不是:-))
使用
Reactor:
Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
.map(line -> someProcessingOfSingleLine(line))
.buffer(BUFFER_SIZE)
.subscribe(apiService::makeHttpRequest);
平心而论,看看优雅的Vavr解决方案:
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
我有一个包含项目列表的大文件。
我想创建一批项目,用这批项目发出 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数)。我可以用 for
循环很容易地做到这一点,但作为 Java 8 爱好者,我想尝试用 Java 8 的 Stream 框架编写它(并获得延迟处理的好处)。
示例:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
我想做一些长期的事情
lazyFileStream.group(500).map(processBatch).collect(toList())
最好的方法是什么?
注意! 此解决方案在 运行 forEach 之前读取整个文件。
您可以使用 jOOλ 来完成,这是一个扩展 Java 8 个流的库,用于单线程、顺序流用例:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
在幕后,zipWithIndex()
只是:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... 而 groupBy()
是 API 方便:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(免责声明:我在 jOOλ 背后的公司工作)
纯 Java-8 实现也是可能的:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
请注意,与 JOOl 不同,它可以很好地并行工作(前提是您的 data
是随机访问列表)。
你也可以看看cyclops-react,我是这个库的作者。它实现了 jOOλ 接口(并通过扩展 JDK 8 Streams),但与 JDK 8 Parallel Streams 不同,它专注于异步操作(例如可能阻塞异步 I/O 调用)。 JDK Parallel Streams,相比之下,侧重于 CPU 绑定操作的数据并行性。它通过在后台管理基于 Future 的任务的聚合来工作,但向最终用户提供标准的扩展 Stream API。
此示例代码可以帮助您入门
LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
还有一个more general Tutorial here
要使用您自己的线程池(这可能更适合阻塞 I/O),您可以使用
开始处理 LazyReact reactor = new LazyReact(40);
reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
您也可以使用 RxJava:
RxJava v3:
int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
.buffer(batchSize)
.map(batch -> process(batch))
.blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));
以前的版本:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
或
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
或
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
为了完整起见,这里有一个 Guava 解决方案。
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
在问题中,集合可用,因此不需要流,可以写成,
Iterables.partition(data, batchSize).forEach(this::process);
纯Java8解:
我们可以创建一个自定义收集器来优雅地执行此操作,它接受一个 batch size
和一个 Consumer
来处理每个批次:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
然后可选择创建一个辅助实用程序 class:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
用法示例:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
我也在GitHub上发布了我的代码,如果有人想看的话:
我为这样的场景写了一个自定义的 Spliterator。它将从输入流中填充给定大小的列表。这种方式的优点是会进行惰性处理,并且会和其他流函数一起工作。
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
我们有一个类似的问题需要解决。我们想要采用大于系统内存的流(遍历数据库中的所有对象)并尽可能随机化顺序 - 我们认为缓冲 10,000 个项目并随机化它们是可以的。
目标是接收流的函数。
在此处提出的解决方案中,似乎有多种选择:
- 使用各种非java 8 个附加库
- 从不是流的内容开始 - 例如随机访问列表
- 有一个可以在拆分器中轻松拆分的流
我们最初的直觉是使用自定义收集器,但这意味着放弃流式处理。上面的自定义收集器解决方案很好,我们差点就用上了。
这是一个利用 Stream
s 可以给你一个 Iterator
的事实作弊的解决方案,你可以将其用作 逃生口 让你做了一些流不支持的额外事情。使用另一位 Java 8 StreamSupport
魔法将 Iterator
转换回流。
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
使用它的一个简单示例如下所示:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
以上打印
[A, B, C]
[D, E, F]
对于我们的用例,我们想要打乱批次,然后将它们作为流保存 - 它看起来像这样:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
这输出类似(它是随机的,每次都不同)
A
C
B
E
D
F
这里的秘诀在于始终存在一个流,因此您可以对批处理流进行操作,或者对每个批处理做一些事情,然后 flatMap
将其返回到流。更好的是,以上所有仅 运行s 作为最终的 forEach
或 collect
或其他终止表达式 PULL 通过流的数据。
事实证明,iterator
是一种特殊类型的终止操作,不会导致整个流运行并进入记忆!感谢 Java 8 个人的出色设计!
使用 Spliterator 的简单示例
// read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}
Bruce 的回答更全面,但我一直在寻找快速而肮脏的方法来处理一堆文件。
纯 Java 8 示例也适用于并行流。
使用方法:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
方法声明与实现:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);
stream.forEach(element -> {
List<ElementType> fullBatch;
synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}
batchProcessor.accept(fullBatch);
});
if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}
这是一个纯粹的 java 延迟计算的解决方案。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
使用 Java 8
和 com.google.common.collect.Lists
,您可以执行以下操作:
public class BatchProcessingUtil {
public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
List<List<T>> batches = Lists.partition(data, batchSize);
return batches.stream()
.map(processFunction) // Send each batch to the process function
.flatMap(Collection::stream) // flat results to gather them in 1 stream
.collect(Collectors.toList());
}
}
这里T
是输入列表中项目的类型,U
是输出列表中项目的类型
你可以这样使用它:
List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
userKeys,
10, // Batch Size
partialKeys -> service.getUsers(partialKeys)
);
您可以使用 apache.commons :
ListUtils.partition(ListOfLines, 500).stream()
.map(partition -> processBatch(partition)
.collect(Collectors.toList());
分区部分已完成 un-lazily 但在列表分区后,您将获得使用流的好处(例如使用并行流、添加过滤器等)。 其他答案提出了更详尽的解决方案,但有时可读性和可维护性更重要(有时它们不是:-))
使用 Reactor:
Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
.map(line -> someProcessingOfSingleLine(line))
.buffer(BUFFER_SIZE)
.subscribe(apiService::makeHttpRequest);
平心而论,看看优雅的Vavr解决方案:
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);