我如何强制一个 Observable 完成?

How do I force an observeable to complete?

有点小众问题,但我知道问题所在,所以希望这里有人能帮助我。这是一个 Observable/RXFire 问题,而不是 xstate 问题。

我有一台调用 observable 的机器:

export const tribeMachine = Machine(
  {
    id: "council",
    initial: "init",
    context: {},
    states: {
      init: {
        invoke: {
          id: "gettribes",
          src: () =>
            collectionData(database.collection("tribes")).pipe(
              concatAll(),
              map(x => ({ type: "STORE", x }))
            ),
          onDone: "loaded"
        },
        on: {
          STORE: {
            actions: "storetribes"
          },
          CANCEL: "loaded"
        }
      },
      loaded: {
        entry: () => console.log("loaded")
      },
      error: {
        entry: () => console.log("error")
      }
    }
  },
  {
    actions: {
      storetribes: (context, event) => console.log("hello")
    }
  }
);

它应该工作的方式是机器在加载时调用 observable,然后一旦 obs 完成发出它的值并调用 complete(),invoke.onDone 被调用并且机器转换到'loaded'状态。

当我使用通过 complete() 调用创建的普通可观察对象时,或者当我将 take(#) 添加到 .pipe() 的末尾时,转换有效。

但出于某种原因,来自 RXFire 的 collectionData() 的可观察对象没有发出 'complete' 信号......机器就在那里。

我试过在末尾添加一个 empty() 并连接 observables 以将完整的信号添加到管道的末端......但后来我发现 empty() 已被弃用而且它似乎并没有起作用。

我的头一直在撞墙。感谢您的帮助。


编辑:

解决方案:

我误解了 collectionData() 的用途。它是一个监听器,所以它不应该完成。我在圆孔里放了一个方钉。解决方案是重构 xstate 机器,所以我根本不需要调用 onDone。

仍然感谢您的回答。


EDIT2:开始工作了。

take(1) 可以在 concatAll() 之前调用。我以为如果你先调用它,它就会结束流,但事实并非如此。管道中的其余运算符仍然适用。所以我 take(1) 获取单个数组,使用 concatAll() 将数组展平为单个对象流,然后将该数据映射到触发 STORE 操作的新对象。然后存储操作将数据设置到机器的上下文中。

export const tribeMachine = Machine({
    id: 'council',
    initial: 'init',
    context: {
        tribes: {},
        markers: []
    },
    states: {
        init: {
            invoke: {
                id: 'gettribes',
                src: () => collectionData(database.collection('tribes')).pipe(
                    take(1),
                    concatAll(),
                    map(value => ({ type: 'TRIBESTORE', value })),
                ),
                onDone: 'loaded'
            },
            on: {
                TRIBESTORE: {
                    actions: ['storetribes', 'logtribes']
                },
                CANCEL: 'loaded'
            }
        },
        loaded: {
        },
        error: {
        }
    }
},
    {
        actions: {
            storetribes: assign((context, event) => {
                return {
                    tribes: {
                        ...context.tribes,
                        [event.value.id]: event.value
                     },
                     markers: [
                         ...context.markers,
                         {
                             lat: event.value.lat,
                             lng: event.value.lng,
                             title: event.value.tribeName
                         }
                        ]
                     }
            })
        }
    }
)

感谢大家的帮助!

Observables 可以随着时间的推移 return 多个值,因此由 collectionData() 决定何时完成(即调用 complete())。

但是,如果你只想从 observable 中取 1 个值,你可以尝试:

  collectionData(database.collection("tribes")).pipe(
              take(1),
              concatAll(),
              map(x => ({ type: "STORE", x }))
            ),

一旦您从 collectionData() 中获取 1 个值,这将导致 observable 完成。

注意:这可能不是最佳解决方案,因为它取决于您使用的可观察流的工作方式。我只是强调您可以使用 take(1) 只取 1 个值并完成源可观察。