Observable 非空时默认

Observable defaulting when not empty

我有一些代码从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观察对象,然后写入数据库,将其添加到一个数组(创建一个可观察对象数组), 因此当通过 forkJoin 订阅 observables 数组时,所有必要的数据都会写入数据库。

在数组中的 observables 数量变得相当大之前,这似乎工作得很好。行数可以是 0-6000 之间的任何值,因此数组的大小可以增长到这个值。当它确实达到这个大小时,可观察对象不再写入数据库,而是 returns 来自 defaultIfEmpty 的默认值。我很困惑为什么它在较小数量的可观察对象下正常工作,但在较大数量时突然变空...

结合代码示例可能更清楚一些

function writeToDB() {
   // rows taken from the database, n = 0..6000
   data = []

   // array of observables
   observables = []

   for (const row of data) {
      if (row.age > 20) {
         // websocket between service and database, returns an observable
         const observable = websocket.put(row).pipe(
            o$.catchError((err) => { 
               return r$.of(err) 
            }),
            o$.defaultIfEmpty({
               success: true,
               status: 200
            })
         );

         observables.push(observable);
      }
   }

   return forkJoin([...observables]);
}

使用此示例在订阅时工作得非常好,除了大型数据集,其中数组 observables 的长度约为 5000。那时它开始 return defaultIfEmpty{ success: true, status: 200 } 我不知道为什么...任何帮助或建议将不胜感激。

从您在此处显示的内容看不清楚。尽管如此,如果这适用于较少数量的调用,那么 websocket 很有可能在这些数字上表现出一些奇怪的行为。

值得尝试的方法可能是限制您的 websocket 调用的并发性。

function writeToDB(data) {
  // data contains rows taken from the database, n = 0..6000

  return from(data).pipe(

    filter(row => row.age > 20),

    map(row => websocket.put(row).pipe(
      
      catchError(err => of(err)),

      // last makes sure that mergeAll behaves like forkJoin
      last(undefined, {
         success: true,
         status: 200
      })

    )),

    // mergeAll lets you choose how many can run concurrently
    // for example, at most 50 websocket calls are made at
    // once here
    mergeAll(50),
    toArray()
  );
  
}

在这种情况下,我更喜欢 map mergeAll 而不是 mergeMap(因为我认为您不太可能错过它的并发方面),但您可以使用.


function writeToDB(data) {
  // data contains rows taken from the database, n = 0..6000

  return from(data).pipe(

    filter(row => row.age > 20),

    mergeMap(row => websocket.put(row).pipe(
      
      catchError(err => of(err)),

      // last makes sure that mergeMap behaves like forkJoin
      last(undefined, {
         success: true,
         status: 200
      })

    ), 50), // <- sneaky! ;)

    toArray()
  );
  
}