给定订阅者的 Redux 逻辑订阅取消

Redux-logic subscription cancel for given subscriber

我正在尝试使用 redux-logic 中间件实现订阅。 想法如下:当从服务器获取数据时,为每个订阅者调用回调,将获取的数据作为参数传递。

// logic/subscriptions.js

const fetchLatestLogic = createLogic({
  type: FETCH_LATEST_DATA,
  latest: true,

  process({getState, action}, dispatch, done) {
    const {seriesType, nextUpdateTime} = action.payload;
    const callbacks = getState()[seriesType][nextUpdateTime].callbacks
    apiFetch(seriesType)
      .then(data => {
        callbacks.forEach(callback => callback(seriesType, data));
           done()
        })
  }
})

const subscribeLogic = createLogic({
  type: SUBSCRIPTIONS_SUBSCRIBE,
  cancelType: SUBSCRIPTIONS_REMOVE,

  process({getState, action, cancelled$}, dispatch) {
    const {seriesType, nextUpdateTime, updateInterval, subscriberId, callback} = action.payload;
    const interval = setInterval(() => {
      dispatch(fetchLatestData(seriesType, nextUpdateTime))
    }, updateInterval);

    cancelled$.subscribe(() => {
        clearInterval(interval)
    })
 }
})

// reducers/subscriptions.js

import update from 'immutability-helper';

update.extend('$autoArray', (value, object) => (object ? update(object, value) : update([], value)));

const initialState = {
  'SERIESTYPE1': {}
  'SERIESTYPE2': {}
}

// state modifications using 'immutable-helpers'
const serieAddSubscriberForTime = (seriesSubscriptions, time, subscriber) =>
  update(seriesSubscriptions, {
    [time]: {
      $autoArray: {
        $push: [subscriber]
      }
    }
});

// state modifications using 'immutable-helpers'
const serieRemoveSubscriberForTime = (seriesSubscriptions, subscriptionTime, subscriber) => {
  const subscriptions = seriesSubscriptions[subscriptionTime].filter(s => s.subscriberId !== subscriber.subscriberId);
  if (subscriptions.length === 0) {
    return update(seriesSubscriptions, { $unset: [subscriptionTime] });
  }
  return { ...seriesSubscriptions, ...{ [subscriptionTime]: subscriptions } 
};  

export default function reducer(state = initialState, action) {
  switch (action.type) {
    case SUBSCRIPTIONS_SUBSCRIBE: {
        const { seriesType, nextUpdateTime, subscriber} = action.payload;
        const newSubscriptionAdded = serieAddSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
        const oldSubscriptionRemoved = serieRemoveSubscriberForTime(state[seriesType], nextUpdateTime, subscriber);
        return update(state, { [seriesType]: { ...oldSubscriptionRemoved, ...newSubscriptionAdded } });
    }
    default:
      return state;
  }
}

如何才能取消给定订阅者的 运行 间隔? *(没有将 intervalID 分派给 reducer 并将其保存在状态中?)

因为仅通过调度操作

cancelType: SUBSCRIPTIONS_REMOVE

将删除我当前实施的所有订阅的所有时间间隔。

更新:实际上有更流畅的方式来执行取消逻辑。

cancelled$

是一个可观察对象,RxJS .subscribe() 接受三个函数作为参数:

[onNext] (Function): Function to invoke for each element in the observable sequence.
[onError] (Function): Function to invoke upon exceptional termination of the observable sequence.
[onCompleted] (Function): Function to invoke upon graceful termination of the observable sequence.

所以 onNext 函数的参数是一个发射值,因为在我们的例子中它是 SUBSCRIPTIONS_REMOVE 动作,我们可以访问它的有效载荷并根据该有效载荷进行取消:

cancelled$.subscribe(cancellAction => {
  if (cancellAction.payload.subscriberId === subscriberId &&
      cancellAction.payload.seriesType === seriesType) {
     clearTimeout(runningTimeout);
  }
})