RXJS Observables - 运行 每 500 毫秒重复一次函数并将结果输出到可观察流

RXJS Observables - Run a repeating function every 500ms and output results to an observable stream

首先 - 我是 Observables 的新手。来自 Python.

我正在努力实现:

下面的代码是有效的 - 除了它不重复。我不知道 RXJS 6 的定时器、延迟或间隔。


// In this example the chart has 3 horizontal lines at 5,10 and 17 "prices"
// Desired output is 5 -- 10 -- 17
// When 4th line is added at 20 "price" expected output would be
// 5 -- 10 -- 17 -- 20


const test$ = new Observable<number>(observer => {
      chart
      .activeChart()
      .getAllShapes()
      .forEach((shape) => {
        if (shape.name == 'horizontal_line') {
          chart
            .activeChart()
            .getShapeById(shape.id)
            .getPoints()
            .forEach((point) => {
              observer.next(+point.price.toFixed(4));
            });
        }
      })

    }).pipe(
      delay(500),
      repeat()

一种选择是使用 setTimeout 函数来重复您的代码,而不是使用 observable。

这是一个这样做的例子。

export class SomeComponent implements OnInit,OnDestroy {

    Alive = true

    ngOnInit(){
        this.repeating_function()
        
    }
    ngOnDestroy(){
        this.Alive = false
    }

    repeating_function(){
        // some code

        //repeat the function every 500 ms while the component is active
        if (this.Alive){
            setTimeout(() => {this.repeating_function()}, 500);
        }
    }
}

除非您的 observable 完成,否则 repeat 将不起作用:

const test$ = new Observable<number>(observer => {
  chart
    .activeChart()
    .getAllShapes()
    .forEach((shape) => {
      if (shape.name == 'horizontal_line') {
        chart
          .activeChart()
          .getShapeById(shape.id)
          .getPoints()
          .forEach((point) => {
            observer.next(+point.price.toFixed(4));
          });
      }
    })

  observer.complete(); // <-- Complete observable after all emissions
}).pipe(
  delay(500),
  repeat()
);

Run a repeating function every 500ms and output results to an observable stream

我认为最简单的方法是使用 interval (or timer 如果您需要立即发射 ) 每 500 毫秒创建一个发射流,然后简单地将发射映射到您的函数return 值:

test$ = interval(500).pipe(
    map(() => theFunction())
);

这是这个简单示例的工作 StackBlitz。显然,您可以让该函数执行您需要它执行的任何操作:-)

我不确定到目前为止你用 RxJS timerinterval 尝试了什么。您可以使用 timer 并使用 map 运算符映射到所需数据。然后,您可以使用 Array#mapArray#filter 来根据要求转换数据。

timer(0, 500).pipe(   // <-- emit immediately and after every 500ms thereafter
  map(_ => 
    chart
      .activeChart()
      .getAllShapes()
      .map(shape => {
        if (shape.name === 'horizontal_line')
          return shape.getPoints()
      })
      .filter((points) => !!points)  // <-- filter `undefined` from previous step
  )
).subscribe({
  next: points => {
    /* output:
      [
        [ <collection of points from line 5> ],
        [ <collection of points from line 10> ],
        [ <collection of points from line 17> ],
        ...
      ]
    */
  }
});

如果出于某种原因你希望将所有点组合到一个数组中并将它们连续发送到订阅中,你需要将 swithcMap 运算符与 RxJS from 函数一起使用。

timer(0, 500).pipe(
  map(_ => 
    chart
      .activeChart()
      .getAllShapes()
      .map(shape => {
        if (shape.name === 'horizontal_line')
          return shape.getPoints()
      })
      .filter((points) => !!points)  // <-- filter `undefined` from previous step
  ),
  switchMap(pointsColl => {
    return from(pointsColl.flat())
  })
).subscribe({
  next: points => {
    /* output:
      point 1 from line 5
      point 2 from line 5
      ...
    */
  }
});

如果您需要将所有点的集合作为单个数组发出,则需要使用 of 函数而不是 from

// <repeat from above>
  switchMap(pointsColl => {
    return of(pointsColl.flat())
  })
).subscribe({
  next: points => {
    /* output:
      [ <collection of points from line 5, 10 and 17> ]
    */
  }
});