如何将发出的事件事件绑定到 redux-saga 中?
How to tie emitted events events into redux-saga?
我正在尝试使用 redux-saga to connect events from PouchDB to my React.js 应用程序,但我正在努力弄清楚如何将 PouchDB 发出的事件连接到我的 Saga。由于事件使用回调函数(我不能将生成器传递给它),我不能在回调中使用 yield put()
,它在 ES2015 编译(使用 Webpack)后给出了奇怪的错误。
所以这就是我想要完成的,不起作用的部分在 replication.on('change' (info) => {})
。
function * startReplication (wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield call(wrapper.connect.bind(wrapper))
// Returns a promise, or false.
let replication = wrapper.replicate()
if (replication) {
replication.on('change', (info) => {
yield put(replicationChange(info))
})
}
}
}
export default [ startReplication ]
我们要解决的根本问题是事件发射器是 'push-based',而 sagas 是 'pull-based'。
如果您像这样订阅一个事件:replication.on('change', (info) => {})
,那么只要 replication
事件发射器决定 推送 一个新值,就会执行回调.
对于 sagas,我们需要翻转控件。 saga 必须控制它何时决定响应可用的新更改信息。换句话说,传奇需要拉新信息。
下面是实现此目的的一种方法的示例:
function* startReplication(wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield apply(wrapper, wrapper.connect);
let replication = wrapper.replicate()
if (replication)
yield call(monitorChangeEvents, replication);
}
}
function* monitorChangeEvents(replication) {
const stream = createReadableStreamOfChanges(replication);
while (true) {
const info = yield stream.read(); // Blocks until the promise resolves
yield put(replicationChange(info));
}
}
// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
let deferred;
replication.on('change', info => {
if (!deferred) return;
deferred.resolve(info);
deferred = null;
});
return {
read() {
if (deferred)
return deferred.promise;
deferred = {};
deferred.promise = new Promise(resolve => deferred.resolve = resolve);
return deferred.promise;
}
};
}
这里有上面例子的JSbin:http://jsbin.com/cujudes/edit?js,console
您还应该看看 Yassine Elouafi 对类似问题的回答:
正如 Nirrek 所解释的那样,当您需要连接到推送数据源时,您必须为该源构建一个事件迭代器。
我想补充一点,上述机制可以重复使用。所以我们不必为每个不同的源重新创建事件迭代器。
解决方案是使用 put
和 take
方法创建通用 通道 。您可以从生成器内部调用 take
方法并将 put
方法连接到数据源的侦听器接口。
这是一个可能的实现。请注意,如果没有人在等待消息(例如,生成器正忙于进行一些远程调用),通道会缓冲消息
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
那么上面的频道可以随时用来收听外部推送数据源。举个例子
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on('change', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}
感谢@Yassine Elouafi
我根据@Yassine Elouafi 的解决方案创建了简短的 MIT 许可通用频道实现作为 TypeScript 语言的 redux-saga 扩展。
// redux-saga/channels.ts
import { Saga } from 'redux-saga';
import { call, fork } from 'redux-saga/effects';
export interface IChannel<TMessage> {
take(): Promise<TMessage>;
put(message: TMessage): void;
}
export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
while (true) {
const message: TMessage = yield call(channel.take);
yield fork(saga, message);
}
}
export function createChannel<TMessage>(): IChannel<TMessage> {
const messageQueue: TMessage[] = [];
const resolveQueue: ((message: TMessage) => void)[] = [];
function put(message: TMessage): void {
if (resolveQueue.length) {
const nextResolve = resolveQueue.shift();
nextResolve(message);
} else {
messageQueue.push(message);
}
}
function take(): Promise<TMessage> {
if (messageQueue.length) {
return Promise.resolve(messageQueue.shift());
} else {
return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
}
}
return {
take,
put
};
}
和redux-saga *takeEvery构造类似的例子用法
// example-socket-action-binding.ts
import { put } from 'redux-saga/effects';
import {
createChannel,
takeEvery as takeEveryChannelMessage
} from './redux-saga/channels';
export function* socketBindActions(
socket: SocketIOClient.Socket
) {
const socketChannel = createSocketChannel(socket);
yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
yield put(action);
});
}
function createSocketChannel(socket: SocketIOClient.Socket) {
const socketChannel = createChannel<IAction>();
socket.on('action', (action: IAction) => socketChannel.put(action));
return socketChannel;
}
我在使用 PouchDB 时也遇到了同样的问题,发现所提供的答案非常有用和有趣。然而,在 PouchDB 中有很多方法可以做同样的事情,我仔细研究了一下,发现了一种可能更容易推理的不同方法。
如果您不将侦听器附加到 db.change
请求,那么它 return 将任何更改数据直接发送给调用者并将 continuous: true
添加到选项将导致发出longpoll 而不是 return 直到发生一些变化。所以用下面的
也可以达到同样的效果
export function * monitorDbChanges() {
var info = yield call([db, db.info]); // get reference to last change
let lastSeq = info.update_seq;
while(true){
try{
var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
if (changes){
for(let i = 0; i < changes.results.length; i++){
yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc});
}
lastSeq = changes.last_seq;
}
}catch (error){
yield put({type: 'monitor-changes-error', err: error})
}
}
}
有一件事我没有追根究底。如果我用 change.results.forEach((change)=>{...})
替换 for
循环,那么我会在 yield
上收到无效的语法错误。我假设这与迭代器使用中的一些冲突有关。
我们可以使用redux-saga的eventChannel
这是我的例子
// fetch history messages
function* watchMessageEventChannel(client) {
const chan = eventChannel(emitter => {
client.on('message', (message) => emitter(message));
return () => {
client.close().then(() => console.log('logout'));
};
});
while (true) {
const message = yield take(chan);
yield put(receiveMessage(message));
}
}
function* fetchMessageHistory(action) {
const client = yield realtime.createIMClient('demo_uuid');
// listen message event
yield fork(watchMessageEventChannel, client);
}
请注意:
默认情况下不缓冲 eventChannel 上的消息。如果你只想一个一个地处理message event
,你不能在const message = yield take(chan);
之后使用阻塞调用
或者您必须向 eventChannel 工厂提供缓冲区,以便为通道指定缓冲策略(例如 eventChannel(subscriber, buffer))。有关详细信息,请参阅 redux-saga API docs
我正在尝试使用 redux-saga to connect events from PouchDB to my React.js 应用程序,但我正在努力弄清楚如何将 PouchDB 发出的事件连接到我的 Saga。由于事件使用回调函数(我不能将生成器传递给它),我不能在回调中使用 yield put()
,它在 ES2015 编译(使用 Webpack)后给出了奇怪的错误。
所以这就是我想要完成的,不起作用的部分在 replication.on('change' (info) => {})
。
function * startReplication (wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield call(wrapper.connect.bind(wrapper))
// Returns a promise, or false.
let replication = wrapper.replicate()
if (replication) {
replication.on('change', (info) => {
yield put(replicationChange(info))
})
}
}
}
export default [ startReplication ]
我们要解决的根本问题是事件发射器是 'push-based',而 sagas 是 'pull-based'。
如果您像这样订阅一个事件:replication.on('change', (info) => {})
,那么只要 replication
事件发射器决定 推送 一个新值,就会执行回调.
对于 sagas,我们需要翻转控件。 saga 必须控制它何时决定响应可用的新更改信息。换句话说,传奇需要拉新信息。
下面是实现此目的的一种方法的示例:
function* startReplication(wrapper) {
while (yield take(DATABASE_SET_CONFIGURATION)) {
yield apply(wrapper, wrapper.connect);
let replication = wrapper.replicate()
if (replication)
yield call(monitorChangeEvents, replication);
}
}
function* monitorChangeEvents(replication) {
const stream = createReadableStreamOfChanges(replication);
while (true) {
const info = yield stream.read(); // Blocks until the promise resolves
yield put(replicationChange(info));
}
}
// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
let deferred;
replication.on('change', info => {
if (!deferred) return;
deferred.resolve(info);
deferred = null;
});
return {
read() {
if (deferred)
return deferred.promise;
deferred = {};
deferred.promise = new Promise(resolve => deferred.resolve = resolve);
return deferred.promise;
}
};
}
这里有上面例子的JSbin:http://jsbin.com/cujudes/edit?js,console
您还应该看看 Yassine Elouafi 对类似问题的回答:
正如 Nirrek 所解释的那样,当您需要连接到推送数据源时,您必须为该源构建一个事件迭代器。
我想补充一点,上述机制可以重复使用。所以我们不必为每个不同的源重新创建事件迭代器。
解决方案是使用 put
和 take
方法创建通用 通道 。您可以从生成器内部调用 take
方法并将 put
方法连接到数据源的侦听器接口。
这是一个可能的实现。请注意,如果没有人在等待消息(例如,生成器正忙于进行一些远程调用),通道会缓冲消息
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
那么上面的频道可以随时用来收听外部推送数据源。举个例子
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on('change', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}
感谢@Yassine Elouafi
我根据@Yassine Elouafi 的解决方案创建了简短的 MIT 许可通用频道实现作为 TypeScript 语言的 redux-saga 扩展。
// redux-saga/channels.ts
import { Saga } from 'redux-saga';
import { call, fork } from 'redux-saga/effects';
export interface IChannel<TMessage> {
take(): Promise<TMessage>;
put(message: TMessage): void;
}
export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
while (true) {
const message: TMessage = yield call(channel.take);
yield fork(saga, message);
}
}
export function createChannel<TMessage>(): IChannel<TMessage> {
const messageQueue: TMessage[] = [];
const resolveQueue: ((message: TMessage) => void)[] = [];
function put(message: TMessage): void {
if (resolveQueue.length) {
const nextResolve = resolveQueue.shift();
nextResolve(message);
} else {
messageQueue.push(message);
}
}
function take(): Promise<TMessage> {
if (messageQueue.length) {
return Promise.resolve(messageQueue.shift());
} else {
return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
}
}
return {
take,
put
};
}
和redux-saga *takeEvery构造类似的例子用法
// example-socket-action-binding.ts
import { put } from 'redux-saga/effects';
import {
createChannel,
takeEvery as takeEveryChannelMessage
} from './redux-saga/channels';
export function* socketBindActions(
socket: SocketIOClient.Socket
) {
const socketChannel = createSocketChannel(socket);
yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
yield put(action);
});
}
function createSocketChannel(socket: SocketIOClient.Socket) {
const socketChannel = createChannel<IAction>();
socket.on('action', (action: IAction) => socketChannel.put(action));
return socketChannel;
}
我在使用 PouchDB 时也遇到了同样的问题,发现所提供的答案非常有用和有趣。然而,在 PouchDB 中有很多方法可以做同样的事情,我仔细研究了一下,发现了一种可能更容易推理的不同方法。
如果您不将侦听器附加到 db.change
请求,那么它 return 将任何更改数据直接发送给调用者并将 continuous: true
添加到选项将导致发出longpoll 而不是 return 直到发生一些变化。所以用下面的
export function * monitorDbChanges() {
var info = yield call([db, db.info]); // get reference to last change
let lastSeq = info.update_seq;
while(true){
try{
var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
if (changes){
for(let i = 0; i < changes.results.length; i++){
yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc});
}
lastSeq = changes.last_seq;
}
}catch (error){
yield put({type: 'monitor-changes-error', err: error})
}
}
}
有一件事我没有追根究底。如果我用 change.results.forEach((change)=>{...})
替换 for
循环,那么我会在 yield
上收到无效的语法错误。我假设这与迭代器使用中的一些冲突有关。
我们可以使用redux-saga的eventChannel
这是我的例子
// fetch history messages
function* watchMessageEventChannel(client) {
const chan = eventChannel(emitter => {
client.on('message', (message) => emitter(message));
return () => {
client.close().then(() => console.log('logout'));
};
});
while (true) {
const message = yield take(chan);
yield put(receiveMessage(message));
}
}
function* fetchMessageHistory(action) {
const client = yield realtime.createIMClient('demo_uuid');
// listen message event
yield fork(watchMessageEventChannel, client);
}
请注意:
默认情况下不缓冲 eventChannel 上的消息。如果你只想一个一个地处理message event
,你不能在const message = yield take(chan);
或者您必须向 eventChannel 工厂提供缓冲区,以便为通道指定缓冲策略(例如 eventChannel(subscriber, buffer))。有关详细信息,请参阅 redux-saga API docs