创建一个以有限速率发出项目的 Flowable,以避免需要缓冲事件
Creating a Flowable that emits items at a limited rate to avoid the need to buffer events
我有一个数据访问对象,它将数据源中的每个项目传递给消费者:
public interface Dao<T> {
void forEachItem(Consumer<T> item);
}
这始终以单线程方式生成项目 - 我目前无法更改此界面。
我想从这个界面创建一个Flowable
:
private static Flowable<String> flowable(final Dao dao) {
return Flowable.create(emitter -> {
dao.forEachItem(item ->
emitter.onNext(item));
emitter.onComplete();
}, ERROR);
}
如果我在处理时间比项目发射速度长的情况下使用此 Flowable
,那么我可以理解地得到一个丢失的背压异常,因为我正在使用 ERROR
模式:
Dao<String> exampleDao =
itemConsumer ->
IntStream.range(0, 1_000).forEach(i ->
itemConsumer.accept(String.valueOf(i)));
flowable(exampleDao)
.map(v -> {
Thread.sleep(100);
return "id:" + v;
})
.blockingSubscribe(System.out::println);
我不希望缓冲项目 - 似乎这可能会导致非常大的数据集耗尽内存 - 如果操作比生产者慢得多。
我希望有一个背压模式,允许发射器在检测到背压时通过 next/completion 事件阻塞,但情况似乎并非如此?
在我的例子中,据我所知,dao 以单线程方式生成项目,我认为我可以做类似的事情:
dao.forEachItem(item -> {
while (emitter.requested() == 0) {
waitABit();
}
emitter.onNext(item)
});
但这似乎永远挂着。
我的方法错在哪里? :-) 鉴于我的(相对限制性的)一系列情况,是否有一种方法可以以尊重下游背压的方式生产物品?
我知道我可以用一个单独的进程写入一个队列,然后根据从该队列消费来编写一个 Flowable 来做到这一点——那会是首选方法吗?
检查Flowable的部分,尤其是Supscription.request(long)
的部分。我希望这能让你走上正确的道路。
此示例中的 TestProducer
在给定范围内生成 Integer
个对象并将它们推送到其 Subscriber
。它扩展了 Flowable<Integer>
class。对于新订阅者,它会创建一个 Subscription
对象,其 request(long)
方法用于创建和发布整数值。
对于传递给 subscriber
的 Subscription
来说很重要的一点是,可以从此递归调用订阅者上调用 onNext()
的 request()
方法onNext()
打电话。为了防止堆栈溢出,所示的实现使用 outStandingRequests
计数器和 isProducing
标志。
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
本例中的 Consumer 扩展了 DefaultSubscriber<Integer>
并在开始时和消费完 Integer 后请求下一个。在使用 Integer 值时,会有一点延迟,因此将为生产者建立背压。
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
在下面的主要测试方法中 class 创建并连接了生产者和消费者:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
当运行示例时,日志文件显示消费者持续运行,而生产者仅在需要重新填充rxjava2的内部Flowable缓冲区时才激活。
我有一个数据访问对象,它将数据源中的每个项目传递给消费者:
public interface Dao<T> {
void forEachItem(Consumer<T> item);
}
这始终以单线程方式生成项目 - 我目前无法更改此界面。
我想从这个界面创建一个Flowable
:
private static Flowable<String> flowable(final Dao dao) {
return Flowable.create(emitter -> {
dao.forEachItem(item ->
emitter.onNext(item));
emitter.onComplete();
}, ERROR);
}
如果我在处理时间比项目发射速度长的情况下使用此 Flowable
,那么我可以理解地得到一个丢失的背压异常,因为我正在使用 ERROR
模式:
Dao<String> exampleDao =
itemConsumer ->
IntStream.range(0, 1_000).forEach(i ->
itemConsumer.accept(String.valueOf(i)));
flowable(exampleDao)
.map(v -> {
Thread.sleep(100);
return "id:" + v;
})
.blockingSubscribe(System.out::println);
我不希望缓冲项目 - 似乎这可能会导致非常大的数据集耗尽内存 - 如果操作比生产者慢得多。
我希望有一个背压模式,允许发射器在检测到背压时通过 next/completion 事件阻塞,但情况似乎并非如此?
在我的例子中,据我所知,dao 以单线程方式生成项目,我认为我可以做类似的事情:
dao.forEachItem(item -> {
while (emitter.requested() == 0) {
waitABit();
}
emitter.onNext(item)
});
但这似乎永远挂着。
我的方法错在哪里? :-) 鉴于我的(相对限制性的)一系列情况,是否有一种方法可以以尊重下游背压的方式生产物品?
我知道我可以用一个单独的进程写入一个队列,然后根据从该队列消费来编写一个 Flowable 来做到这一点——那会是首选方法吗?
检查Flowable的部分,尤其是Supscription.request(long)
的部分。我希望这能让你走上正确的道路。
此示例中的 TestProducer
在给定范围内生成 Integer
个对象并将它们推送到其 Subscriber
。它扩展了 Flowable<Integer>
class。对于新订阅者,它会创建一个 Subscription
对象,其 request(long)
方法用于创建和发布整数值。
对于传递给 subscriber
的 Subscription
来说很重要的一点是,可以从此递归调用订阅者上调用 onNext()
的 request()
方法onNext()
打电话。为了防止堆栈溢出,所示的实现使用 outStandingRequests
计数器和 isProducing
标志。
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
本例中的 Consumer 扩展了 DefaultSubscriber<Integer>
并在开始时和消费完 Integer 后请求下一个。在使用 Integer 值时,会有一点延迟,因此将为生产者建立背压。
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
在下面的主要测试方法中 class 创建并连接了生产者和消费者:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
当运行示例时,日志文件显示消费者持续运行,而生产者仅在需要重新填充rxjava2的内部Flowable缓冲区时才激活。