如何从样本数组创建 Observable?
How do I create Observable from an array of samples?
假设我们有一些从传感器读取的样本缓冲区。每次我 运行 一个方法(即 buffer.fetchNewSamples()
)时,缓冲区都会被送入新样本。我将如何从这样的对象创建 Java Rx Observable,该对象会 return 一个一个地从缓冲区中提取每个样本,并在发出最后一个样本时调用 buffer.fetchNewSamples()
然后以相同的方式继续?
我刚开始使用 Rx,虽然创建一个 returns 的可观察对象非常容易,即此类缓冲区的平均值,但我不知道如何在不创建新的情况下创建上述内容线程并做一些同步...
给定一个单一的感官 API float[] measure()
,您可以通过以下顺序进行定时测量和发射:
Observable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(t -> measure())
.concatMap(fa -> Observable.range(0, fa.length).map(i -> fa[i]))
.subscribe(...)
说明
给定 10 毫秒的间隔,删除任何非请求的内容并将计时器值映射到测量值数组(每 10 毫秒发生一次)。给定一个接一个发射的测量值数组,通过等效的索引 for 循环将其展平到其数组元素,但要确保下一个测量值仅在前一个测量值发射完数组元素后才出现。最后,你有一个花车流。
如果你想在上一次调用的所有测量值都被消耗后立即测量,你可以执行以下操作:
BehaviorSubject<Integer> bs = BehaviorSubject.create(1);
bs.observeOn(Schedulers.trampoline())
.map(t -> measure())
.concatMap(fa ->
Observable.range(0, fa.length).map(i -> fa[i])
.finallyDo(() -> bs.onNext(1))
)
说明
我们利用 BehaviorSubject 的行为将其存储的值发送给第一个订阅者,我们将使用它来触发测量。为了避免无限递归,我们将在蹦床调度程序上观察主题。一旦信号通过,我们就进行测量并将其转换为如上所示的一系列浮点数。唯一的区别是,每当这样的子序列完成时,它都会向 BehaviorSubject 发出信号以触发另一次测量。
假设我们有一些从传感器读取的样本缓冲区。每次我 运行 一个方法(即 buffer.fetchNewSamples()
)时,缓冲区都会被送入新样本。我将如何从这样的对象创建 Java Rx Observable,该对象会 return 一个一个地从缓冲区中提取每个样本,并在发出最后一个样本时调用 buffer.fetchNewSamples()
然后以相同的方式继续?
我刚开始使用 Rx,虽然创建一个 returns 的可观察对象非常容易,即此类缓冲区的平均值,但我不知道如何在不创建新的情况下创建上述内容线程并做一些同步...
给定一个单一的感官 API float[] measure()
,您可以通过以下顺序进行定时测量和发射:
Observable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(t -> measure())
.concatMap(fa -> Observable.range(0, fa.length).map(i -> fa[i]))
.subscribe(...)
说明
给定 10 毫秒的间隔,删除任何非请求的内容并将计时器值映射到测量值数组(每 10 毫秒发生一次)。给定一个接一个发射的测量值数组,通过等效的索引 for 循环将其展平到其数组元素,但要确保下一个测量值仅在前一个测量值发射完数组元素后才出现。最后,你有一个花车流。
如果你想在上一次调用的所有测量值都被消耗后立即测量,你可以执行以下操作:
BehaviorSubject<Integer> bs = BehaviorSubject.create(1);
bs.observeOn(Schedulers.trampoline())
.map(t -> measure())
.concatMap(fa ->
Observable.range(0, fa.length).map(i -> fa[i])
.finallyDo(() -> bs.onNext(1))
)
说明 我们利用 BehaviorSubject 的行为将其存储的值发送给第一个订阅者,我们将使用它来触发测量。为了避免无限递归,我们将在蹦床调度程序上观察主题。一旦信号通过,我们就进行测量并将其转换为如上所示的一系列浮点数。唯一的区别是,每当这样的子序列完成时,它都会向 BehaviorSubject 发出信号以触发另一次测量。