RXJS Observables - 运行 每 500 毫秒重复一次函数并将结果输出到可观察流
RXJS Observables - Run a repeating function every 500ms and output results to an observable stream
首先 - 我是 Observables 的新手。来自 Python.
我正在努力实现:
- 每 500 毫秒从图表调用一个函数到 return 个形状
- 形状return排列在数组中
- 我只想要过滤后的子集“horizontal_lines”
- 通过 observable
输出“horizontal_lines”
下面的代码是有效的 - 除了它不重复。我不知道 RXJS 6 的定时器、延迟或间隔。
// In this example the chart has 3 horizontal lines at 5,10 and 17 "prices"
// Desired output is 5 -- 10 -- 17
// When 4th line is added at 20 "price" expected output would be
// 5 -- 10 -- 17 -- 20
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
}).pipe(
delay(500),
repeat()
一种选择是使用 setTimeout 函数来重复您的代码,而不是使用 observable。
这是一个这样做的例子。
export class SomeComponent implements OnInit,OnDestroy {
Alive = true
ngOnInit(){
this.repeating_function()
}
ngOnDestroy(){
this.Alive = false
}
repeating_function(){
// some code
//repeat the function every 500 ms while the component is active
if (this.Alive){
setTimeout(() => {this.repeating_function()}, 500);
}
}
}
除非您的 observable 完成,否则 repeat
将不起作用:
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
observer.complete(); // <-- Complete observable after all emissions
}).pipe(
delay(500),
repeat()
);
Run a repeating function every 500ms and output results to an observable stream
我认为最简单的方法是使用 interval
(or timer
如果您需要立即发射 ) 每 500 毫秒创建一个发射流,然后简单地将发射映射到您的函数return 值:
test$ = interval(500).pipe(
map(() => theFunction())
);
这是这个简单示例的工作 StackBlitz。显然,您可以让该函数执行您需要它执行的任何操作:-)
我不确定到目前为止你用 RxJS timer
和 interval
尝试了什么。您可以使用 timer
并使用 map
运算符映射到所需数据。然后,您可以使用 Array#map
和 Array#filter
来根据要求转换数据。
timer(0, 500).pipe( // <-- emit immediately and after every 500ms thereafter
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
)
).subscribe({
next: points => {
/* output:
[
[ <collection of points from line 5> ],
[ <collection of points from line 10> ],
[ <collection of points from line 17> ],
...
]
*/
}
});
如果出于某种原因你希望将所有点组合到一个数组中并将它们连续发送到订阅中,你需要将 swithcMap
运算符与 RxJS from
函数一起使用。
timer(0, 500).pipe(
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
),
switchMap(pointsColl => {
return from(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
point 1 from line 5
point 2 from line 5
...
*/
}
});
如果您需要将所有点的集合作为单个数组发出,则需要使用 of
函数而不是 from
。
// <repeat from above>
switchMap(pointsColl => {
return of(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
[ <collection of points from line 5, 10 and 17> ]
*/
}
});
首先 - 我是 Observables 的新手。来自 Python.
我正在努力实现:
- 每 500 毫秒从图表调用一个函数到 return 个形状
- 形状return排列在数组中
- 我只想要过滤后的子集“horizontal_lines”
- 通过 observable 输出“horizontal_lines”
下面的代码是有效的 - 除了它不重复。我不知道 RXJS 6 的定时器、延迟或间隔。
// In this example the chart has 3 horizontal lines at 5,10 and 17 "prices"
// Desired output is 5 -- 10 -- 17
// When 4th line is added at 20 "price" expected output would be
// 5 -- 10 -- 17 -- 20
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
}).pipe(
delay(500),
repeat()
一种选择是使用 setTimeout 函数来重复您的代码,而不是使用 observable。
这是一个这样做的例子。
export class SomeComponent implements OnInit,OnDestroy {
Alive = true
ngOnInit(){
this.repeating_function()
}
ngOnDestroy(){
this.Alive = false
}
repeating_function(){
// some code
//repeat the function every 500 ms while the component is active
if (this.Alive){
setTimeout(() => {this.repeating_function()}, 500);
}
}
}
除非您的 observable 完成,否则 repeat
将不起作用:
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
observer.complete(); // <-- Complete observable after all emissions
}).pipe(
delay(500),
repeat()
);
Run a repeating function every 500ms and output results to an observable stream
我认为最简单的方法是使用 interval
(or timer
如果您需要立即发射 ) 每 500 毫秒创建一个发射流,然后简单地将发射映射到您的函数return 值:
test$ = interval(500).pipe(
map(() => theFunction())
);
这是这个简单示例的工作 StackBlitz。显然,您可以让该函数执行您需要它执行的任何操作:-)
我不确定到目前为止你用 RxJS timer
和 interval
尝试了什么。您可以使用 timer
并使用 map
运算符映射到所需数据。然后,您可以使用 Array#map
和 Array#filter
来根据要求转换数据。
timer(0, 500).pipe( // <-- emit immediately and after every 500ms thereafter
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
)
).subscribe({
next: points => {
/* output:
[
[ <collection of points from line 5> ],
[ <collection of points from line 10> ],
[ <collection of points from line 17> ],
...
]
*/
}
});
如果出于某种原因你希望将所有点组合到一个数组中并将它们连续发送到订阅中,你需要将 swithcMap
运算符与 RxJS from
函数一起使用。
timer(0, 500).pipe(
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
),
switchMap(pointsColl => {
return from(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
point 1 from line 5
point 2 from line 5
...
*/
}
});
如果您需要将所有点的集合作为单个数组发出,则需要使用 of
函数而不是 from
。
// <repeat from above>
switchMap(pointsColl => {
return of(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
[ <collection of points from line 5, 10 and 17> ]
*/
}
});