导致竞争条件的环回事件流

loopback event stream causing race condition

我的总体目标是观察我的数据库是否有任何更改,并自动将这些更改广播给连接到我网站的任何用户。 我看到的问题是我有一个操作触发对我的数据库的 post 请求,然后事件流同时被触发,因为我正在观察的模型已经改变。这导致初始动作完成,事件流触发的动作在完成之前被中断。

这是在我的数据库中创建新博客 post 条目时触发的第一个操作

export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
    return {
        type: 'TOPIC_SUBMIT',
        payload: axios({
            method: 'post',
            url: `/api/blogPosts`,
            data: {
                "blogTitle": newTopic,
                "blogBody": newTopicBody,
                "date": date,
                "upVotes": 0,
                "numComments": 0,
                "voteNames": [],
                "memberId": memberId,
                "steamNameId": name
            }
        })
            .then(response => {
                return response.data
            })
            .catch(err => err)
    }
}

// this is the boot script that creates the change stream

var es = require('event-stream');
module.exports = function (app) {
    console.log('realtime boot script')
    var BlogPost = app.models.BlogPost;
    BlogPost.createChangeStream(function (err, changes) {
        changes.pipe(es.stringify()).pipe(process.stdout);
    });
}

// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end

componentDidMount() {
        const { dispatch } = this.props;
          let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
          let src = new EventSource(urlToChangeStream);
          src.addEventListener('data', function (msg) {
              let data = JSON.parse(msg.data);
          dispatch(liveChangeBlogs(data))
          });

我期望 'TOPIC_SUBMIT' 操作应该 return 在事件侦听器调度 'liveChangeBlogs' 操作之前完成 这是我在环回事件流 https://loopback.io/doc/en/lb3/Realtime-server-sent-events.html

上找到的文档

I am expecting that the 'TOPIC_SUBMIT' action should return fulfilled before the 'liveChangeBlogs' action is dispatched by the event listener

恐怕这是不可能的。即使 LoopBack 服务器在发送对 POST 请求的响应之前拒绝发送 event-stream 条目,它仍然无法保证客户端将接收(并处理!)响应在处理 event-stream 条目之前发送到 POST 请求。

我的建议是跟踪您客户端中的 in-flight 个请求,并删除 event-stream 个由同一客户端所做更改的条目。

大致如下:

const pendingRequests = [];

export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
    const data = {
                "blogTitle": newTopic,
                "blogBody": newTopicBody,
                "date": date,
                "upVotes": 0,
                "numComments": 0,
                "voteNames": [],
                "memberId": memberId,
                "steamNameId": name
            };
    const entry = {type: 'CREATE', data}
    pendingRequests.push(entry);

    return {
        type: 'TOPIC_SUBMIT',
        payload: axios({
            method: 'post',
            url: `/api/blogPosts`,
            data,

        })
            .then(response => {
                return response.data
            })
            .catch(err => err)
            .finally(() => {
              // remove the entry from pending requests
              const ix = pendingRequests.indexOf(entry);
              if (ix > -1) pendingRequests.splice(ix, 1);
            })
    }
}

// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end
// (...)
          src.addEventListener('data', function (msg) {
              let data = JSON.parse(msg.data);
              const ours = pendingRequests.find(it => {
                it.type === data.type && /* check fields that uniquely identify model instance and/or the change being made */
              });
              if (ours) {
                // this event was triggered by us, discard it
                return; 
              }
          dispatch(liveChangeBlogs(data))
          });

我最终使用 Redux Thunk 解决了这个问题,向我的 componentDidMount 添加了一个 setTimeout 和一个闭包。 topicSubmit 操作和启动脚本没有改变。不确定 setTimeout 是否是正确的方法,但这是我能想到的绕过比赛案例的唯一方法。

 componentDidMount() {
        const { dispatch } = this.props;
        const newBlog = this.handleNewBlog;
        let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
        let src = new EventSource(urlToChangeStream);
        src.addEventListener('data', function (msg) {
            newBlog(msg)
        });

        const newThread = this.handleNewThread;
        let urlToChangeStream2 = '/api/threads/change-stream?_format=event-stream';
        let src2 = new EventSource(urlToChangeStream2);
        src2.addEventListener('data', function (msg) {
            newThread(msg)
        });
        dispatch(getBlogs());
    }

  handleNewBlog(msg) {
        const { dispatch } = this.props;
        let data = JSON.parse(msg.data);
        if(data.data == undefined) {
            setTimeout(() => {
                dispatch(getBlogs());
            }, 1000);
        } else {
            setTimeout(() => {
                dispatch(liveChangeBlogs(data));
            }, 1000);
        }
    }

 handleNewThread(msg) {
        const { dispatch, viewingThreadId } = this.props;
        let data2 = JSON.parse(msg.data);
        console.log('data2: ', data2)
        if (data2.type == 'remove') {
            return dispatch(getThreadsById(viewingThreadId))
        }
        let id = data2.data.blogPostId
        setTimeout(() => {
            if (viewingThreadId === id) {
                dispatch(getThreadsById(id));
            } else {
                return
            }
        }, 1000);
    }