RxJS 随着时间的推移发出数组项?
RxJS emit array items over time?
我正在尝试一个接一个地发出简单的数组值,中间有 500 毫秒:
var a = Rx.Observable.from([1,2,3]);
a.interval(500).subscribe(function(b) { console.log(b); });
但是,这会引发异常:
Uncaught TypeError: a.interval is not a function.
Rx.Observable 实例没有 interval
方法 http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/index.html。你可以这样使用。
Rx.Observable.interval(500)
.map(function(v) { return [1,2,3];})
.subscribe(console.log.bind(console));
正如 xgrommx 已经指出的,interval
不是 observable 的实例成员,而是 Rx.Observable
.
的静态成员
Rx.Observable.fromArray([1,2,3]).zip(
Rx.Observable.interval(500), function(a, b) { return a; })
.subscribe(
function(x) { document.write(x + '<br \>'); },
null,
function() { document.write("complete"); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
var arrayList = [1,2,3,4,5];
var source = Rx.Observable
.interval(500/* ms */)
.timeInterval()
.take(arrayList.length);
source.subscribe(function(idx){
console.log(arrayList[idx]);
//or document.write or whatever needed
});
我会这样做:
var fruits = ['apple', 'orange', 'banana', 'apple'];
var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]);
observable.subscribe(t => {
console.log(t);
document.body.appendChild(document.createTextNode(t + ', '));
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
很晚了,但更简单的解决方案是:
const arr = ["Hi,", "how", "may", "I", "help", "you?"];
Rx.Observable.interval(500)
.takeWhile(_ => _ < arr.length)
.map(_ => arr[_])
.subscribe(_ => console.log(_))
我发现 Weichhold 技术是最好的,但它可以通过在 zip 之外提取压缩值来获得清晰的意图:
// assume some input stream of values:
var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8);
// emit each value from stream at a given interval:
var events = Obs.zip(inputs, Obs.interval(1000))
.map(val => val[0])
.forEach(console.log);
三种方法,使用 RxJS 版本 6 :
1。使用 concatMap
import { from, of, pipe } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
from(array)
.pipe(
concatMap(val => of(val).pipe(delay(1000))),
)
.subscribe(console.log);
2。使用 zip
和 interval
import { from, pipe, interval } from 'rxjs';
import { delay, zip} from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
from(array)
.pipe(
zip(interval(1000), (a, b) => a),
)
.subscribe(console.log);
3。使用 interval
作为来源
import { interval, pipe } from 'rxjs';
import { map, take } from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
interval(1000)
.pipe(
take(array.length),
map(i => array[i])
)
.subscribe(console.log);
我有一些不同的要求,我的阵列也随着时间的推移不断更新。所以基本上我必须实现一个可以定期出列的队列,但我不想使用间隔。
如果有人需要这样的东西,那么这个解决方案可能会有所帮助:
我有一个函数 createQueue()
将数组作为输入,returns 一个 Observable,我们订阅它以定期侦听来自数组的事件。
该函数还修改了 passes 数组的 'push()' 方法,以便无论何时将任何项目推入数组,Observable 都会发出。
createQueue(queue: string[]) {
return Observable.create((obs: Observer<void>) => {
const arrayPush = queue.push;
queue.push = (data: string) => {
const returnVal = arrayPush.call(queue, data);
obs.next();
return returnVal;
}
}).pipe(switchMap(() => {
return from([...queue])
.pipe(
concatMap(val => of(val)
.pipe(delay(1000)))
);
}), tap(_ => queue.shift()))
}
假设数组是:taskQueue = [];
所以,我们需要将它传递给上面的函数并订阅它。
createQueue(taskQueue).subscribe((data) => {
console.log('Data from queue => ', data);
});
现在,每次我们taskQueue.push('<something here>')
,订阅都会在延迟“1000ms”后触发。
请注意:我们不应该在调用 createQueue()
之后将新数组分配给 taskQueue
,否则我们将丢失修改后的 push()
.
这是上述实现的虚拟示例:Test Example
如果你想随着时间的推移发布样本,你可以这样做
const observable = interval(100).pipe(
scan((acc, value) => [value, ...acc], []),
sampleTime(10000),
map((acc) => acc[0])
);
我正在尝试一个接一个地发出简单的数组值,中间有 500 毫秒:
var a = Rx.Observable.from([1,2,3]);
a.interval(500).subscribe(function(b) { console.log(b); });
但是,这会引发异常:
Uncaught TypeError: a.interval is not a function.
Rx.Observable 实例没有 interval
方法 http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/index.html。你可以这样使用。
Rx.Observable.interval(500)
.map(function(v) { return [1,2,3];})
.subscribe(console.log.bind(console));
正如 xgrommx 已经指出的,interval
不是 observable 的实例成员,而是 Rx.Observable
.
Rx.Observable.fromArray([1,2,3]).zip(
Rx.Observable.interval(500), function(a, b) { return a; })
.subscribe(
function(x) { document.write(x + '<br \>'); },
null,
function() { document.write("complete"); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
var arrayList = [1,2,3,4,5];
var source = Rx.Observable
.interval(500/* ms */)
.timeInterval()
.take(arrayList.length);
source.subscribe(function(idx){
console.log(arrayList[idx]);
//or document.write or whatever needed
});
我会这样做:
var fruits = ['apple', 'orange', 'banana', 'apple'];
var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]);
observable.subscribe(t => {
console.log(t);
document.body.appendChild(document.createTextNode(t + ', '));
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
很晚了,但更简单的解决方案是:
const arr = ["Hi,", "how", "may", "I", "help", "you?"];
Rx.Observable.interval(500)
.takeWhile(_ => _ < arr.length)
.map(_ => arr[_])
.subscribe(_ => console.log(_))
我发现 Weichhold 技术是最好的,但它可以通过在 zip 之外提取压缩值来获得清晰的意图:
// assume some input stream of values:
var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8);
// emit each value from stream at a given interval:
var events = Obs.zip(inputs, Obs.interval(1000))
.map(val => val[0])
.forEach(console.log);
三种方法,使用 RxJS 版本 6 :
1。使用 concatMap
import { from, of, pipe } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
from(array)
.pipe(
concatMap(val => of(val).pipe(delay(1000))),
)
.subscribe(console.log);
2。使用 zip
和 interval
import { from, pipe, interval } from 'rxjs';
import { delay, zip} from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
from(array)
.pipe(
zip(interval(1000), (a, b) => a),
)
.subscribe(console.log);
3。使用 interval
作为来源
import { interval, pipe } from 'rxjs';
import { map, take } from 'rxjs/operators';
const array = [1, 2, 3, 4, 5];
interval(1000)
.pipe(
take(array.length),
map(i => array[i])
)
.subscribe(console.log);
我有一些不同的要求,我的阵列也随着时间的推移不断更新。所以基本上我必须实现一个可以定期出列的队列,但我不想使用间隔。
如果有人需要这样的东西,那么这个解决方案可能会有所帮助:
我有一个函数 createQueue()
将数组作为输入,returns 一个 Observable,我们订阅它以定期侦听来自数组的事件。
该函数还修改了 passes 数组的 'push()' 方法,以便无论何时将任何项目推入数组,Observable 都会发出。
createQueue(queue: string[]) {
return Observable.create((obs: Observer<void>) => {
const arrayPush = queue.push;
queue.push = (data: string) => {
const returnVal = arrayPush.call(queue, data);
obs.next();
return returnVal;
}
}).pipe(switchMap(() => {
return from([...queue])
.pipe(
concatMap(val => of(val)
.pipe(delay(1000)))
);
}), tap(_ => queue.shift()))
}
假设数组是:taskQueue = [];
所以,我们需要将它传递给上面的函数并订阅它。
createQueue(taskQueue).subscribe((data) => {
console.log('Data from queue => ', data);
});
现在,每次我们taskQueue.push('<something here>')
,订阅都会在延迟“1000ms”后触发。
请注意:我们不应该在调用 createQueue()
之后将新数组分配给 taskQueue
,否则我们将丢失修改后的 push()
.
这是上述实现的虚拟示例:Test Example
如果你想随着时间的推移发布样本,你可以这样做
const observable = interval(100).pipe(
scan((acc, value) => [value, ...acc], []),
sampleTime(10000),
map((acc) => acc[0])
);