基于资源稀缺性的背压可观察性
Backpressure observable based on resource scarcity
在 RxJava 1 / RxScala 中,我如何 throttle/backpressure 在以下情况下可以观察到源?
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
一个可能的解决方案就是阻塞直到。哪个有效,但这非常不优雅并且会阻止多个同时请求:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
flatMap有一个限制并发订阅者数量的参数。如果你使用这个 flatMap 会为你处理背压。
def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))
在 RxJava 1 / RxScala 中,我如何 throttle/backpressure 在以下情况下可以观察到源?
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
一个可能的解决方案就是阻塞直到。哪个有效,但这非常不优雅并且会阻止多个同时请求:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
flatMap有一个限制并发订阅者数量的参数。如果你使用这个 flatMap 会为你处理背压。
def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))