有没有类似 Iterator 的东西,但有像 Streams 这样的功能?
Is there something like an Iterator, but with functions like Streams?
所以基本上我想做的是以下内容:
- 从数据库加载一批数据
- 将该数据(
Object[]
查询结果)映射到 class 以可读格式表示数据
- 写入文件
- 重复直到查询没有更多结果
我列出了我熟悉的似乎符合需要的结构以及为什么它们不符合我的需要。
- 迭代器 → 没有不调用就映射和过滤的选项
next()
- 我需要在 subclass 中定义 map 函数,尽管实际上没有数据(类似于流),这样我就可以将“Stream”方式传递给调用 class 并且只在那里调用
next
,然后调用所有地图函数作为结果
- 流 → 在映射和过滤成为可能之前,所有数据都需要可用
- Observable → 一旦数据可用就发送数据。不过我需要同步处理它
为了更清楚地了解我要做什么,我举了一个小例子:
// Disclaimer: "Something" is the structure I am not sure of now.
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
@Inject
private DataGetter dataGetter;
public void doWork() {
FileWriter writer = new FileWriter("filename");
// Write the formatted data to the file
dataGetter.getData()
.forEach(data -> writer.writeToFile(data));
}
}
public class FileWriter {
public void writeToFile(List<Thing> data) {
// Write to file
}
}
public class DataGetter {
@Inject
private ThingDao thingDao;
public Something<List<Thing>> getData() {
// Map data to the correct format and return that
return thingDao.getThings()
.map(partialResult -> /* map to object */);
}
}
public class ThingDao {
public Something<List<Object[]>> getThings() {
Query q = ...;
// Dont know what to return
}
}
到目前为止我得到了什么:
我试图从迭代器的基础开始,因为它是唯一真正满足我的内存要求的。然后我添加了一些方法来映射和循环数据。虽然这并不是一个真正可靠的设计,但它会比我想象的更难,所以我想知道是否已经有任何东西可以满足我的需要。
public class QIterator<E> implements Iterator<List<E>> {
public static String QUERY_OFFSET = "queryOffset";
public static String QUERY_LIMIT = "queryLimit";
private Query query;
private long lastResultIndex = 0;
private long batchSize;
private Function<List<Object>, List<E>> mapper;
public QIterator(Query query, long batchSize) {
this.query = query;
this.batchSize = batchSize;
}
public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
this(query, batchSize);
this.mapper = mapper;
}
@Override
public boolean hasNext() {
return lastResultIndex % batchSize == 0;
}
@Override
public List<E> next() {
query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);
List<Object> result = (List<Object>) query.getResultList(); // unchecked
lastResultIndex += result.size();
List<E> mappedResult;
if (mapper != null) {
mappedResult = mapper.apply(result);
} else {
mappedResult = (List<E>) result; // unchecked
}
return mappedResult;
}
public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
return new QIterator<>(query, batchSize, (data) -> {
if (this.mapper != null) {
return appendingMapper.apply(this.mapper.apply(data));
} else {
return appendingMapper.apply((List<E>) data);
}
});
}
public void forEach(BiConsumer<List<E>, Integer> consumer) {
for (int i = 0; this.hasNext(); i++) {
consumer.accept(this.next(), i);
}
}
}
目前为止这是有效的,但是有一些我不太喜欢的 unchecked
作业,而且我希望能够将一个 QIterator “附加”到另一个 QIterator 本身并不难,但是它也应该采用追加之后的地图。
假设您有一个以分页方式提供数据的 DAO,例如通过将 LIMIT
和 OFFSET
子句应用于基础 SQL。这样的 DAO class 将有一个方法将这些值作为参数,即该方法将符合以下功能方法:
@FunctionalInterface
public interface PagedDao<T> {
List<T> getData(int offset, int limit);
}
例如调用 getData(0, 20)
会 return 前 20 行(第 1 页),调用 getData(60, 20)
会 return 第 4 页的 20 行。如果方法 return 少超过 20 行,这意味着我们得到了最后一页。在最后一行之后请求数据将 return 一个空列表。
对于下面的演示,我们可以模拟这样一个 DAO class:
public class MockDao {
private final int rowCount;
public MockDao(int rowCount) {
this.rowCount = rowCount;
}
public List<SimpleRow> getSimpleRows(int offset, int limit) {
System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
if (offset < 0 || limit <= 0)
throw new IllegalArgumentException();
List<SimpleRow> data = new ArrayList<>();
for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
data.add(new SimpleRow("Row #" + rowNo));
System.out.println("DEBUG: data = " + data);
return data;
}
}
public class SimpleRow {
private final String data;
public SimpleRow(String data) {
this.data = data;
}
@Override
public String toString() {
return "Row[data=" + this.data + "]";
}
}
如果你想生成一个Stream
of rows from that method, streaming all rows in blocks of a certain size, we need a Spliterator
for that, so we can use StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)
来创建一个流。
下面是这样一个 Spliterator
的实现:
public class PagedDaoSpliterator<T> implements Spliterator<T> {
private final PagedDao<T> dao;
private final int blockSize;
private int nextOffset;
private List<T> data;
private int dataIdx;
public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
if (blockSize <= 0)
throw new IllegalArgumentException();
this.dao = Objects.requireNonNull(dao);
this.blockSize = blockSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (this.data == null) {
if (this.nextOffset == -1/*At end*/)
return false; // Already at end
this.data = this.dao.getData(this.nextOffset, this.blockSize);
this.dataIdx = 0;
if (this.data.size() < this.blockSize)
this.nextOffset = -1/*At end, after this data*/;
else
this.nextOffset += data.size();
if (this.data.isEmpty()) {
this.data = null;
return false; // At end
}
}
action.accept(this.data.get(this.dataIdx++));
if (this.dataIdx == this.data.size())
this.data = null;
return true;
}
@Override
public Spliterator<T> trySplit() {
return null; // Parallel processing not supported
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // Unknown
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
我们现在可以使用上面的模拟 DAO 进行测试:
MockDao dao = new MockDao(13);
Stream<SimpleRow> stream = StreamSupport.stream(
new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);
输出
DEBUG: getData(0, 5)
DEBUG: data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG: data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG: data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]
可以看出,我们得到了 13 行数据,以 5 行为一组从数据库中检索。
只有在需要时才会从数据库中检索数据,从而导致内存占用量低,具体取决于块大小和不缓存数据的流操作。
您可以按如下方式在一行中完成:
stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(queryThatReturnsAllRowsOrdered);
Stream.generate(rs.next() ? map(rs) : null)
.takeWhile(Objects::nonNull)
.filter(<some predicate>)
.forEach(<some operation);
这在从查询返回第一行时开始处理,并与数据库并行继续,直到读取所有行。
这种方法一次在内存中只有一行,并且仅通过 运行 1 个查询将数据库负载降至最低。
从 ResultSet
映射比从 Object[]
映射要简单和自然得多,因为您可以通过 name 访问列并使用正确键入的值,例如:
MyDao map(ResultSet rs) {
try {
String someStr = rs.getString("COLUMN_X");
int someInt = rs.getInt("COLUMN_Y"):
return new MyDao(someStr, someInt);
} catch (SQLException e ) {
throw new RuntimeException(e);
}
}
所以基本上我想做的是以下内容:
- 从数据库加载一批数据
- 将该数据(
Object[]
查询结果)映射到 class 以可读格式表示数据 - 写入文件
- 重复直到查询没有更多结果
我列出了我熟悉的似乎符合需要的结构以及为什么它们不符合我的需要。
- 迭代器 → 没有不调用就映射和过滤的选项
next()
- 我需要在 subclass 中定义 map 函数,尽管实际上没有数据(类似于流),这样我就可以将“Stream”方式传递给调用 class 并且只在那里调用
next
,然后调用所有地图函数作为结果
- 我需要在 subclass 中定义 map 函数,尽管实际上没有数据(类似于流),这样我就可以将“Stream”方式传递给调用 class 并且只在那里调用
- 流 → 在映射和过滤成为可能之前,所有数据都需要可用
- Observable → 一旦数据可用就发送数据。不过我需要同步处理它
为了更清楚地了解我要做什么,我举了一个小例子:
// Disclaimer: "Something" is the structure I am not sure of now.
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
@Inject
private DataGetter dataGetter;
public void doWork() {
FileWriter writer = new FileWriter("filename");
// Write the formatted data to the file
dataGetter.getData()
.forEach(data -> writer.writeToFile(data));
}
}
public class FileWriter {
public void writeToFile(List<Thing> data) {
// Write to file
}
}
public class DataGetter {
@Inject
private ThingDao thingDao;
public Something<List<Thing>> getData() {
// Map data to the correct format and return that
return thingDao.getThings()
.map(partialResult -> /* map to object */);
}
}
public class ThingDao {
public Something<List<Object[]>> getThings() {
Query q = ...;
// Dont know what to return
}
}
到目前为止我得到了什么:
我试图从迭代器的基础开始,因为它是唯一真正满足我的内存要求的。然后我添加了一些方法来映射和循环数据。虽然这并不是一个真正可靠的设计,但它会比我想象的更难,所以我想知道是否已经有任何东西可以满足我的需要。
public class QIterator<E> implements Iterator<List<E>> {
public static String QUERY_OFFSET = "queryOffset";
public static String QUERY_LIMIT = "queryLimit";
private Query query;
private long lastResultIndex = 0;
private long batchSize;
private Function<List<Object>, List<E>> mapper;
public QIterator(Query query, long batchSize) {
this.query = query;
this.batchSize = batchSize;
}
public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
this(query, batchSize);
this.mapper = mapper;
}
@Override
public boolean hasNext() {
return lastResultIndex % batchSize == 0;
}
@Override
public List<E> next() {
query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);
List<Object> result = (List<Object>) query.getResultList(); // unchecked
lastResultIndex += result.size();
List<E> mappedResult;
if (mapper != null) {
mappedResult = mapper.apply(result);
} else {
mappedResult = (List<E>) result; // unchecked
}
return mappedResult;
}
public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
return new QIterator<>(query, batchSize, (data) -> {
if (this.mapper != null) {
return appendingMapper.apply(this.mapper.apply(data));
} else {
return appendingMapper.apply((List<E>) data);
}
});
}
public void forEach(BiConsumer<List<E>, Integer> consumer) {
for (int i = 0; this.hasNext(); i++) {
consumer.accept(this.next(), i);
}
}
}
目前为止这是有效的,但是有一些我不太喜欢的 unchecked
作业,而且我希望能够将一个 QIterator “附加”到另一个 QIterator 本身并不难,但是它也应该采用追加之后的地图。
假设您有一个以分页方式提供数据的 DAO,例如通过将 LIMIT
和 OFFSET
子句应用于基础 SQL。这样的 DAO class 将有一个方法将这些值作为参数,即该方法将符合以下功能方法:
@FunctionalInterface
public interface PagedDao<T> {
List<T> getData(int offset, int limit);
}
例如调用 getData(0, 20)
会 return 前 20 行(第 1 页),调用 getData(60, 20)
会 return 第 4 页的 20 行。如果方法 return 少超过 20 行,这意味着我们得到了最后一页。在最后一行之后请求数据将 return 一个空列表。
对于下面的演示,我们可以模拟这样一个 DAO class:
public class MockDao {
private final int rowCount;
public MockDao(int rowCount) {
this.rowCount = rowCount;
}
public List<SimpleRow> getSimpleRows(int offset, int limit) {
System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
if (offset < 0 || limit <= 0)
throw new IllegalArgumentException();
List<SimpleRow> data = new ArrayList<>();
for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
data.add(new SimpleRow("Row #" + rowNo));
System.out.println("DEBUG: data = " + data);
return data;
}
}
public class SimpleRow {
private final String data;
public SimpleRow(String data) {
this.data = data;
}
@Override
public String toString() {
return "Row[data=" + this.data + "]";
}
}
如果你想生成一个Stream
of rows from that method, streaming all rows in blocks of a certain size, we need a Spliterator
for that, so we can use StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)
来创建一个流。
下面是这样一个 Spliterator
的实现:
public class PagedDaoSpliterator<T> implements Spliterator<T> {
private final PagedDao<T> dao;
private final int blockSize;
private int nextOffset;
private List<T> data;
private int dataIdx;
public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
if (blockSize <= 0)
throw new IllegalArgumentException();
this.dao = Objects.requireNonNull(dao);
this.blockSize = blockSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (this.data == null) {
if (this.nextOffset == -1/*At end*/)
return false; // Already at end
this.data = this.dao.getData(this.nextOffset, this.blockSize);
this.dataIdx = 0;
if (this.data.size() < this.blockSize)
this.nextOffset = -1/*At end, after this data*/;
else
this.nextOffset += data.size();
if (this.data.isEmpty()) {
this.data = null;
return false; // At end
}
}
action.accept(this.data.get(this.dataIdx++));
if (this.dataIdx == this.data.size())
this.data = null;
return true;
}
@Override
public Spliterator<T> trySplit() {
return null; // Parallel processing not supported
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // Unknown
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
我们现在可以使用上面的模拟 DAO 进行测试:
MockDao dao = new MockDao(13);
Stream<SimpleRow> stream = StreamSupport.stream(
new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);
输出
DEBUG: getData(0, 5)
DEBUG: data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG: data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG: data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]
可以看出,我们得到了 13 行数据,以 5 行为一组从数据库中检索。
只有在需要时才会从数据库中检索数据,从而导致内存占用量低,具体取决于块大小和不缓存数据的流操作。
您可以按如下方式在一行中完成:
stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(queryThatReturnsAllRowsOrdered);
Stream.generate(rs.next() ? map(rs) : null)
.takeWhile(Objects::nonNull)
.filter(<some predicate>)
.forEach(<some operation);
这在从查询返回第一行时开始处理,并与数据库并行继续,直到读取所有行。
这种方法一次在内存中只有一行,并且仅通过 运行 1 个查询将数据库负载降至最低。
从 ResultSet
映射比从 Object[]
映射要简单和自然得多,因为您可以通过 name 访问列并使用正确键入的值,例如:
MyDao map(ResultSet rs) {
try {
String someStr = rs.getString("COLUMN_X");
int someInt = rs.getInt("COLUMN_Y"):
return new MyDao(someStr, someInt);
} catch (SQLException e ) {
throw new RuntimeException(e);
}
}