如何在 Observable 中循环并限制每次获取的项目数

How to loop and limit the number of items fetched each time in an Observable

我有以下 Observable,它接收 kafka 消费者记录并将它们插入数据库。 它目前正在工作,我可以在消费者中接收预期的数据并提取这些数据以执行一些映射并将其放入列表中。 此列表中的数据将被插入到数据库中。

按照现在的写法,它将尝试同时插入所有内容。 kafka 记录有可能保存 100k - 100 万条记录。 我正在寻找一种方法来打破它,这样我只从消费者记录中取出 1000 件商品,插入数据库并再次重复接下来的 1000 件商品并继续直到记录为空。这可能吗?

我尝试使用 take、takeuntil 和 repeat 的变体,但它们不起作用。在我订阅之后,调用刚刚结束,当我执行这些操作时甚至没有进入可观察对象。

我能否得到一些关于如何编写此代码的建议,以便我可以从 kafka 记录中获取 1000 条记录,将它们插入数据库并继续这样做直到完成所有 kafka 记录?谢谢

请注意我使用的是 RXJava 1,需要坚持使用这个版本。

private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();

public Observable<KafkaConsumerRecord<String, CustomObj>> dbInsert(KafkaConsumerRecords<String, CustomObj> records) {

    return Observable.just(records.getDelegate().records())
            // attempting to loop based on following counts. Not preferred but unsure of a better way.
            // the figures captured here are correct.
            // plus this doesn't currently matter anyway cos not able to get it to work using takeUntil, repeat. 
            .doOnSubscribe(() -> {
                RECORD_COUNT.set(records.getDelegate().records().count());
                REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
                REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
            })
            .map(consumerRecords -> consumerRecords.records("Topic name"))
            .map(it -> {
                List<CustomRequest> requests = new ArrayList<>();
                it.forEach(r -> {
                    ConsumerRecord<String, SomeObj> record = (ConsumerRecord<String, SomeObj>) r;
                    CustomRequest request = new CustomRequest (
                        new String(record.headers().headers("id").iterator().next().value(), StandardCharsets.UTF_8),
                        Long.parseLong(new String(record.headers().headers("code").iterator().next().value(), StandardCharsets.UTF_8)),
                        record.value()
                    );
                    requests.add(request);
                });
                return requests;
            })
            // nothing happens if I uncomment these. 
            // .takeUntil(customRequests -> customRequests.size() == INSERT_COUNT.get())
            // .repeat(REPEAT_COUNT.get())
            .doOnNext(customRequests -> {
                // planning to do some db inserts here in a transaction of 1000 inserts at a time. 
            })
            .doOnCompleted(() -> System.out.println("Completed"));
}

以下应该适用于 RxJava 1.3.8

rx.Observable.from(List.of(1, 2, 3, 4, 5, 6))
  .buffer(2)
  .doOnNext(r -> System.out.println(r))
  .subscribe();

以下是输出 -

[1, 2]
[3, 4]
[5, 6]

我用下面的版本来测试上面的代码-

<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.8</version>
</dependency>