同步处理可观察订阅事件

Process observable subscribe events synchronously

我正在寻找一种以同步方式处理来自 ReplaySubject.subscribe() 的事件的方法。

let onSomeEvent = new ReplaySubject();

onSomeEvent.subscribe(async (event) => {      
  return await this.saveEventToDb(event);
});

在此示例中,saveEventToDb() 首先检查数据库是否已存储具有相同 ID 的事件。如果没有,它存储它。

问题是我需要考虑从主题触发的重复事件。

在此示例中,当 2 个重复事件接连触发时,两者都会被添加到数据库中,因为 saveEventToDb() 会立即被调用两次,而无需等待上一个调用完成。

如何使用 Rxjs 将它们排队?

以下内容用于同步处理事件:

onSomeEvent
    .map(event => {
       return Observable.defer(() => {
          return this.saveEventToDb(event);
       });
    })
    .concatAll()
    .subscribe();

ConcatAll():收集可观察对象并在上一个完成时订阅下一个。