从集合创建元素流

Creating stream of elements from collection

我正在使用 Junit 5 动态测试。 我的目的是从集合中创建一个元素流,以将其传递给 JUnit5 中的测试。 但是,使用此代码,我只能 运行 1000 条记录。我如何使这项工作无缝无阻塞。

    MongoCollection<Document> collection = mydatabase.getCollection("mycoll");
    final List<Document> cache = Collections.synchronizedList(new ArrayList<Document>());

    FindIterable<Document> f = collection.find().batchSize(1000);
    f.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {

        @Override
        public void onResult(AsyncBatchCursor<Document> t, Throwable thrwbl) {
            t.next(new SingleResultCallback<List<Document>>() {

                @Override
                public void onResult(List<Document> t, Throwable thrwbl) {
                    if (thrwbl != null) {
                        th.set(thrwbl);
                    }
                    cache.addAll(t);
                    latch.countDown();;

                }
            });
        }
    });
    latch.await();
    return cache.stream().map(batch->process(batch));

更新代码

@ParameterizedTest
@MethodSource("setUp")
void cacheTest(MyClazz myclass) throws Exception {
    assertTrue(doTest(myclass));
}
public static MongoClient getMongoClient() {
 // get client here
}

private static Stream<MyClazz> setUp() throws Exception {
    MongoDatabase mydatabase = getMongoClient().getDatabase("test");
    List<Throwable> failures = new ArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);
    List<MyClazz> list = Collections.synchronizedList(new ArrayList<>());
            mydatabase.getCollection("testcollection").find()
            .toObservable().subscribe(
            document -> {
                list.add(process(document));
            },
            throwable -> {
                failures.add(throwable);
            },
            () -> {
                latch.countDown();
            });
    latch.await();
    return list.stream();
}

public boolean doTest(MyClazz myclass) { 
// processing goes here
}
public MyClazz process(Document doc) { 
// doc gets converted to MyClazz
   return MyClazz;
}

即使是现在,我也看到加载了所有数据,然后进行了单元测试。 我认为这是因为 latch.await()。但是,如果我删除它,则可能没有测试用例 运行 因为数据库可能正在加载集合。

我的用例是:我在 mongo 中有数百万条记录,并且正在 运行 与它们进行集成测试。将它们全部加载到内存中是不可行的,因此我正在尝试流式解决方案。

我不认为我完全理解你的用例,但考虑到你的问题被标记为 javamongo-asyc-driver 这个要求当然是可以实现的:

create a stream of elements from the collection to pass it on to test ... make this work seamlessly non-blocking

以下代码:

  • 使用 MongoDB RxJava 驱动程序查询集合
  • 从该集合中创建 Rx Observable
  • 订阅 Observable
  • 记录异常
  • 完成标记

    CountDownLatch latch = new CountDownLatch(1);
    List<Throwable> failures = new ArrayList<>();
    collection.find()
            .toObservable().subscribe(
            // on next, this is invoked for each document returned by your find call
            document -> {
                // presumably you'll want to do something here to meet this requirement: "pass it on to test in JUnit5" 
                System.out.println(document);
            },
            /// on error
            throwable -> {
                failures.add(throwable);
            },
            // on completion
            () -> {
                latch.countDown();
            });
    // await the completion event
    latch.await(); 
    

备注:

  • 这需要使用 MongoDB RxJava driver(即 com.mongodb.rx.client 命名空间中的 类 ... org.mongodb::mongodb-driver-rx Maven 工件)
  • 在你的问题中你正在调用 collection.find().batchSize() 这清楚地表明你 当前 使用 Rx 驱动程序(因为 batchSize 不是 Rx 友好的概念:)
  • 以上代码已通过 MongoDB RxJava 驱动 v1.4.0 和 io.reactive::rxjava
  • v1.1.10 验证

更新 1:根据对您问题的更改(遵循我原来的回答),您问过:" 我看到所有数据都是加载后进行单元测试。我认为这是因为 latch.await()"?我认为您正在从可观察流中弹出[从可观察流中提取列表,并且仅在可观察量耗尽后才开始调用doTest()。这种方法涉及 (1) 来自 MongoDB 的流式处理结果; (2) 将这些结果存储在内存中; (3) 运行 doTest() 每一个存储的结果。如果你真的想全程流式传输,那么你应该从你的 observable 订阅中调用 doTest() 。例如:

mydatabase.getCollection("testcollection").find()
        .toObservable().subscribe(
        document -> {
            doTest(process(document));
        },
        throwable -> {
            failures.add(throwable);
        },
        () -> {
            latch.countDown();
        });
latch.await();

上面的代码将调用 doTest(),因为它从 MongoDB 接收每个文档,当整个 observable 耗尽时,锁存器将递减,您的代码将完成。