如何将发出的事件事件绑定到 redux-saga 中?

How to tie emitted events events into redux-saga?

我正在尝试使用 redux-saga to connect events from PouchDB to my React.js 应用程序,但我正在努力弄清楚如何将 PouchDB 发出的事件连接到我的 Saga。由于事件使用回调函数(我不能将生成器传递给它),我不能在回调中使用 yield put(),它在 ES2015 编译(使用 Webpack)后给出了奇怪的错误。

所以这就是我想要完成的,不起作用的部分在 replication.on('change' (info) => {})

function * startReplication (wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield call(wrapper.connect.bind(wrapper))

    // Returns a promise, or false.
    let replication = wrapper.replicate()

    if (replication) {
      replication.on('change', (info) => {
        yield put(replicationChange(info))
      })
    }
  }
}

export default [ startReplication ]

我们要解决的根本问题是事件发射器是 'push-based',而 sagas 是 'pull-based'。

如果您像这样订阅一个事件:replication.on('change', (info) => {}),那么只要 replication 事件发射器决定 推送 一个新值,就会执行回调.

对于 sagas,我们需要翻转控件。 saga 必须控制它何时决定响应可用的新更改信息。换句话说,传奇需要新信息。

下面是实现此目的的一种方法的示例:

function* startReplication(wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield apply(wrapper, wrapper.connect);
    let replication = wrapper.replicate()
    if (replication)
      yield call(monitorChangeEvents, replication);
  }
}

function* monitorChangeEvents(replication) {
  const stream = createReadableStreamOfChanges(replication);

  while (true) {
    const info = yield stream.read(); // Blocks until the promise resolves
    yield put(replicationChange(info));
  }
}

// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
  let deferred;

  replication.on('change', info => {
    if (!deferred) return;
    deferred.resolve(info);
    deferred = null;
  });

  return {
    read() {
      if (deferred)
        return deferred.promise;

      deferred = {};
      deferred.promise = new Promise(resolve => deferred.resolve = resolve);
      return deferred.promise;
    }
  };
}

这里有上面例子的JSbin:http://jsbin.com/cujudes/edit?js,console

您还应该看看 Yassine Elouafi 对类似问题的回答:

正如 Nirrek 所解释的那样,当您需要连接到推送数据源时,您必须为该源构建一个事件迭代器

我想补充一点,上述机制可以重复使用。所以我们不必为每个不同的源重新创建事件迭代器。

解决方案是使用 puttake 方法创建通用 通道 。您可以从生成器内部调用 take 方法并将 put 方法连接到数据源的侦听器接口。

这是一个可能的实现。请注意,如果没有人在等待消息(例如,生成器正忙于进行一些远程调用),通道会缓冲消息

function createChannel () {
  const messageQueue = []
  const resolveQueue = []

  function put (msg) {
    // anyone waiting for a message ?
    if (resolveQueue.length) {
      // deliver the message to the oldest one waiting (First In First Out)
      const nextResolve = resolveQueue.shift()
      nextResolve(msg)
    } else {
      // no one is waiting ? queue the event
      messageQueue.push(msg)
    }
  }

  // returns a Promise resolved with the next message
  function take () {
    // do we have queued messages ?
    if (messageQueue.length) {
      // deliver the oldest queued message
      return Promise.resolve(messageQueue.shift())
    } else {
      // no queued messages ? queue the taker until a message arrives
      return new Promise((resolve) => resolveQueue.push(resolve))
    }
  }

  return {
    take,
    put
  }
}

那么上面的频道可以随时用来收听外部推送数据源。举个例子

function createChangeChannel (replication) {
  const channel = createChannel()

  // every change event will call put on the channel
  replication.on('change', channel.put)
  return channel
}

function * startReplication (getState) {
  // Wait for the configuration to be set. This can happen multiple
  // times during the life cycle, for example when the user wants to
  // switch database/workspace.
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    let state = getState()
    let wrapper = state.database.wrapper

    // Wait for a connection to work.
    yield apply(wrapper, wrapper.connect)

    // Trigger replication, and keep the promise.
    let replication = wrapper.replicate()

    if (replication) {
      yield call(monitorChangeEvents, createChangeChannel(replication))
    }
  }
}

function * monitorChangeEvents (channel) {
  while (true) {
    const info = yield call(channel.take) // Blocks until the promise resolves
    yield put(databaseActions.replicationChange(info))
  }
}

感谢@Yassine Elouafi

我根据@Yassine Elouafi 的解决方案创建了简短的 MIT 许可通用频道实现作为 TypeScript 语言的 redux-saga 扩展。

// redux-saga/channels.ts
import { Saga } from 'redux-saga';
import { call, fork } from 'redux-saga/effects';

export interface IChannel<TMessage> {
    take(): Promise<TMessage>;
    put(message: TMessage): void;
}

export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
    while (true) {
        const message: TMessage = yield call(channel.take);
        yield fork(saga, message);
    }
}

export function createChannel<TMessage>(): IChannel<TMessage> {
    const messageQueue: TMessage[] = [];
    const resolveQueue: ((message: TMessage) => void)[] = [];

    function put(message: TMessage): void {
        if (resolveQueue.length) {
            const nextResolve = resolveQueue.shift();
            nextResolve(message);
        } else {
            messageQueue.push(message);
        }
    }

    function take(): Promise<TMessage> {
        if (messageQueue.length) {
            return Promise.resolve(messageQueue.shift());
        } else {
            return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
        }
    }

    return {
        take,
        put
    };
}

和redux-saga *takeEvery构造类似的例子用法

// example-socket-action-binding.ts
import { put } from 'redux-saga/effects';
import {
    createChannel,
    takeEvery as takeEveryChannelMessage
} from './redux-saga/channels';

export function* socketBindActions(
    socket: SocketIOClient.Socket
) {
    const socketChannel = createSocketChannel(socket);
    yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
        yield put(action);
    });
}

function createSocketChannel(socket: SocketIOClient.Socket) {
    const socketChannel = createChannel<IAction>();
    socket.on('action', (action: IAction) => socketChannel.put(action));
    return socketChannel;
}

我在使用 PouchDB 时也遇到了同样的问题,发现所提供的答案非常有用和有趣。然而,在 PouchDB 中有很多方法可以做同样的事情,我仔细研究了一下,发现了一种可能更容易推理的不同方法。

如果您不将侦听器附加到 db.change 请求,那么它 return 将任何更改数据直接发送给调用者并将 continuous: true 添加到选项将导致发出longpoll 而不是 return 直到发生一些变化。所以用下面的

也可以达到同样的效果
export function * monitorDbChanges() {
  var info = yield call([db, db.info]); // get reference to last change 
  let lastSeq = info.update_seq;

  while(true){
    try{
      var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
      if (changes){
        for(let i = 0; i < changes.results.length; i++){
          yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc});
        }
        lastSeq = changes.last_seq;
      }
    }catch (error){
      yield put({type: 'monitor-changes-error', err: error})
    }
  }
}

有一件事我没有追根究底。如果我用 change.results.forEach((change)=>{...}) 替换 for 循环,那么我会在 yield 上收到无效的语法错误。我假设这与迭代器使用中的一些冲突有关。

我们可以使用redux-saga的eventChannel

这是我的例子

// fetch history messages
function* watchMessageEventChannel(client) {
  const chan = eventChannel(emitter => {
    client.on('message', (message) => emitter(message));
    return () => {
      client.close().then(() => console.log('logout'));
    };
  });
  while (true) {
    const message = yield take(chan);
    yield put(receiveMessage(message));
  }
}

function* fetchMessageHistory(action) {
  const client = yield realtime.createIMClient('demo_uuid');
  // listen message event
  yield fork(watchMessageEventChannel, client);
}

请注意

默认情况下不缓冲 eventChannel 上的消息。如果你只想一个一个地处理message event,你不能在const message = yield take(chan);

之后使用阻塞调用

或者您必须向 eventChannel 工厂提供缓冲区,以便为通道指定缓冲策略(例如 eventChannel(subscriber, buffer))。有关详细信息,请参阅 redux-saga API docs