从集合创建元素流
Creating stream of elements from collection
我正在使用 Junit 5 动态测试。
我的目的是从集合中创建一个元素流,以将其传递给 JUnit5 中的测试。
但是,使用此代码,我只能 运行 1000 条记录。我如何使这项工作无缝无阻塞。
MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());
FindIterable<Document> f = collection.find().batchSize(1000);
f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {
@Override
public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
t.next(new SingleResultCallback<List<Document>>() {
@Override
public void onResult(List<Document> t, Throwable thrwbl) {
if (thrwbl != null) {
th.set(thrwbl);
}
cache.addAll(t);
latch.countDown();;
}
});
}
});
latch.await();
return cache.stream().map(batch->process(batch));
更新代码
@ParameterizedTest
@MethodSource("setUp")
void cacheTest(MyClazz myclass) throws Exception {
assertTrue(doTest(myclass));
}
public static MongoClient getMongoClient() {
// get client here
}
private static Stream<MyClazz> setUp() throws Exception {
MongoDatabase mydatabase = getMongoClient().getDatabase("test");
List<Throwable> failures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
list.add(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
return list.stream();
}
public boolean doTest(MyClazz myclass) {
// processing goes here
}
public MyClazz process(Document doc) {
// doc gets converted to MyClazz
return MyClazz;
}
即使是现在,我也看到加载了所有数据,然后进行了单元测试。
我认为这是因为 latch.await()。但是,如果我删除它,则可能没有测试用例 运行 因为数据库可能正在加载集合。
我的用例是:我在 mongo 中有数百万条记录,并且正在 运行 与它们进行集成测试。将它们全部加载到内存中是不可行的,因此我正在尝试流式解决方案。
我不认为我完全理解你的用例,但考虑到你的问题被标记为 java
和 mongo-asyc-driver
这个要求当然是可以实现的:
create a stream of elements from the collection to pass it on to test ... make this work seamlessly non-blocking
以下代码:
- 使用 MongoDB RxJava 驱动程序查询集合
- 从该集合中创建 Rx
Observable
- 订阅
Observable
- 记录异常
完成标记
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> failures = new ArrayList<>();
collection.find()
.toObservable().subscribe(
// on next, this is invoked for each document returned by your find call
document -> {
// presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5"
System.out.println(document);
},
/// on error
throwable -> {
failures.add(throwable);
},
// on completion
() -> {
latch.countDown();
});
// await the completion event
latch.await();
备注:
- 这需要使用 MongoDB RxJava driver(即
com.mongodb.rx.client
命名空间中的 类 ... org.mongodb::mongodb-driver-rx
Maven 工件)
- 在你的问题中你正在调用
collection.find().batchSize()
这清楚地表明你 当前 使用 Rx 驱动程序(因为 batchSize
不是 Rx 友好的概念:)
- 以上代码已通过 MongoDB RxJava 驱动 v1.4.0 和
io.reactive::rxjava
v1.1.10 验证
更新 1:根据对您问题的更改(遵循我原来的回答),您问过:" 我看到所有数据都是加载后进行单元测试。我认为这是因为 latch.await()"?我认为您正在从可观察流中弹出[从可观察流中提取列表,并且仅在可观察量耗尽后才开始调用doTest()
。这种方法涉及 (1) 来自 MongoDB 的流式处理结果; (2) 将这些结果存储在内存中; (3) 运行 doTest()
每一个存储的结果。如果你真的想全程流式传输,那么你应该从你的 observable 订阅中调用 doTest()
。例如:
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
doTest(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
上面的代码将调用 doTest()
,因为它从 MongoDB 接收每个文档,当整个 observable 耗尽时,锁存器将递减,您的代码将完成。
我正在使用 Junit 5 动态测试。 我的目的是从集合中创建一个元素流,以将其传递给 JUnit5 中的测试。 但是,使用此代码,我只能 运行 1000 条记录。我如何使这项工作无缝无阻塞。
MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());
FindIterable<Document> f = collection.find().batchSize(1000);
f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {
@Override
public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
t.next(new SingleResultCallback<List<Document>>() {
@Override
public void onResult(List<Document> t, Throwable thrwbl) {
if (thrwbl != null) {
th.set(thrwbl);
}
cache.addAll(t);
latch.countDown();;
}
});
}
});
latch.await();
return cache.stream().map(batch->process(batch));
更新代码
@ParameterizedTest
@MethodSource("setUp")
void cacheTest(MyClazz myclass) throws Exception {
assertTrue(doTest(myclass));
}
public static MongoClient getMongoClient() {
// get client here
}
private static Stream<MyClazz> setUp() throws Exception {
MongoDatabase mydatabase = getMongoClient().getDatabase("test");
List<Throwable> failures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
list.add(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
return list.stream();
}
public boolean doTest(MyClazz myclass) {
// processing goes here
}
public MyClazz process(Document doc) {
// doc gets converted to MyClazz
return MyClazz;
}
即使是现在,我也看到加载了所有数据,然后进行了单元测试。 我认为这是因为 latch.await()。但是,如果我删除它,则可能没有测试用例 运行 因为数据库可能正在加载集合。
我的用例是:我在 mongo 中有数百万条记录,并且正在 运行 与它们进行集成测试。将它们全部加载到内存中是不可行的,因此我正在尝试流式解决方案。
我不认为我完全理解你的用例,但考虑到你的问题被标记为 java
和 mongo-asyc-driver
这个要求当然是可以实现的:
create a stream of elements from the collection to pass it on to test ... make this work seamlessly non-blocking
以下代码:
- 使用 MongoDB RxJava 驱动程序查询集合
- 从该集合中创建 Rx
Observable
- 订阅
Observable
- 记录异常
完成标记
CountDownLatch latch = new CountDownLatch(1); List<Throwable> failures = new ArrayList<>(); collection.find() .toObservable().subscribe( // on next, this is invoked for each document returned by your find call document -> { // presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5" System.out.println(document); }, /// on error throwable -> { failures.add(throwable); }, // on completion () -> { latch.countDown(); }); // await the completion event latch.await();
备注:
- 这需要使用 MongoDB RxJava driver(即
com.mongodb.rx.client
命名空间中的 类 ...org.mongodb::mongodb-driver-rx
Maven 工件) - 在你的问题中你正在调用
collection.find().batchSize()
这清楚地表明你 当前 使用 Rx 驱动程序(因为batchSize
不是 Rx 友好的概念:) - 以上代码已通过 MongoDB RxJava 驱动 v1.4.0 和
io.reactive::rxjava
v1.1.10 验证
更新 1:根据对您问题的更改(遵循我原来的回答),您问过:" 我看到所有数据都是加载后进行单元测试。我认为这是因为 latch.await()"?我认为您正在从可观察流中弹出[从可观察流中提取列表,并且仅在可观察量耗尽后才开始调用doTest()
。这种方法涉及 (1) 来自 MongoDB 的流式处理结果; (2) 将这些结果存储在内存中; (3) 运行 doTest()
每一个存储的结果。如果你真的想全程流式传输,那么你应该从你的 observable 订阅中调用 doTest()
。例如:
mydatabase.getCollection("testcollection").find()
.toObservable().subscribe(
document -> {
doTest(process(document));
},
throwable -> {
failures.add(throwable);
},
() -> {
latch.countDown();
});
latch.await();
上面的代码将调用 doTest()
,因为它从 MongoDB 接收每个文档,当整个 observable 耗尽时,锁存器将递减,您的代码将完成。