导致竞争条件的环回事件流
loopback event stream causing race condition
我的总体目标是观察我的数据库是否有任何更改,并自动将这些更改广播给连接到我网站的任何用户。
我看到的问题是我有一个操作触发对我的数据库的 post 请求,然后事件流同时被触发,因为我正在观察的模型已经改变。这导致初始动作完成,事件流触发的动作在完成之前被中断。
这是在我的数据库中创建新博客 post 条目时触发的第一个操作
export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
return {
type: 'TOPIC_SUBMIT',
payload: axios({
method: 'post',
url: `/api/blogPosts`,
data: {
"blogTitle": newTopic,
"blogBody": newTopicBody,
"date": date,
"upVotes": 0,
"numComments": 0,
"voteNames": [],
"memberId": memberId,
"steamNameId": name
}
})
.then(response => {
return response.data
})
.catch(err => err)
}
}
// this is the boot script that creates the change stream
var es = require('event-stream');
module.exports = function (app) {
console.log('realtime boot script')
var BlogPost = app.models.BlogPost;
BlogPost.createChangeStream(function (err, changes) {
changes.pipe(es.stringify()).pipe(process.stdout);
});
}
// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end
componentDidMount() {
const { dispatch } = this.props;
let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
let src = new EventSource(urlToChangeStream);
src.addEventListener('data', function (msg) {
let data = JSON.parse(msg.data);
dispatch(liveChangeBlogs(data))
});
我期望 'TOPIC_SUBMIT' 操作应该 return 在事件侦听器调度 'liveChangeBlogs' 操作之前完成
这是我在环回事件流 https://loopback.io/doc/en/lb3/Realtime-server-sent-events.html
上找到的文档
I am expecting that the 'TOPIC_SUBMIT' action should return fulfilled before the 'liveChangeBlogs' action is dispatched by the event listener
恐怕这是不可能的。即使 LoopBack 服务器在发送对 POST 请求的响应之前拒绝发送 event-stream 条目,它仍然无法保证客户端将接收(并处理!)响应在处理 event-stream 条目之前发送到 POST 请求。
我的建议是跟踪您客户端中的 in-flight 个请求,并删除 event-stream 个由同一客户端所做更改的条目。
大致如下:
const pendingRequests = [];
export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
const data = {
"blogTitle": newTopic,
"blogBody": newTopicBody,
"date": date,
"upVotes": 0,
"numComments": 0,
"voteNames": [],
"memberId": memberId,
"steamNameId": name
};
const entry = {type: 'CREATE', data}
pendingRequests.push(entry);
return {
type: 'TOPIC_SUBMIT',
payload: axios({
method: 'post',
url: `/api/blogPosts`,
data,
})
.then(response => {
return response.data
})
.catch(err => err)
.finally(() => {
// remove the entry from pending requests
const ix = pendingRequests.indexOf(entry);
if (ix > -1) pendingRequests.splice(ix, 1);
})
}
}
// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end
// (...)
src.addEventListener('data', function (msg) {
let data = JSON.parse(msg.data);
const ours = pendingRequests.find(it => {
it.type === data.type && /* check fields that uniquely identify model instance and/or the change being made */
});
if (ours) {
// this event was triggered by us, discard it
return;
}
dispatch(liveChangeBlogs(data))
});
我最终使用 Redux Thunk 解决了这个问题,向我的 componentDidMount 添加了一个 setTimeout 和一个闭包。 topicSubmit 操作和启动脚本没有改变。不确定 setTimeout 是否是正确的方法,但这是我能想到的绕过比赛案例的唯一方法。
componentDidMount() {
const { dispatch } = this.props;
const newBlog = this.handleNewBlog;
let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
let src = new EventSource(urlToChangeStream);
src.addEventListener('data', function (msg) {
newBlog(msg)
});
const newThread = this.handleNewThread;
let urlToChangeStream2 = '/api/threads/change-stream?_format=event-stream';
let src2 = new EventSource(urlToChangeStream2);
src2.addEventListener('data', function (msg) {
newThread(msg)
});
dispatch(getBlogs());
}
handleNewBlog(msg) {
const { dispatch } = this.props;
let data = JSON.parse(msg.data);
if(data.data == undefined) {
setTimeout(() => {
dispatch(getBlogs());
}, 1000);
} else {
setTimeout(() => {
dispatch(liveChangeBlogs(data));
}, 1000);
}
}
handleNewThread(msg) {
const { dispatch, viewingThreadId } = this.props;
let data2 = JSON.parse(msg.data);
console.log('data2: ', data2)
if (data2.type == 'remove') {
return dispatch(getThreadsById(viewingThreadId))
}
let id = data2.data.blogPostId
setTimeout(() => {
if (viewingThreadId === id) {
dispatch(getThreadsById(id));
} else {
return
}
}, 1000);
}
我的总体目标是观察我的数据库是否有任何更改,并自动将这些更改广播给连接到我网站的任何用户。 我看到的问题是我有一个操作触发对我的数据库的 post 请求,然后事件流同时被触发,因为我正在观察的模型已经改变。这导致初始动作完成,事件流触发的动作在完成之前被中断。
这是在我的数据库中创建新博客 post 条目时触发的第一个操作
export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
return {
type: 'TOPIC_SUBMIT',
payload: axios({
method: 'post',
url: `/api/blogPosts`,
data: {
"blogTitle": newTopic,
"blogBody": newTopicBody,
"date": date,
"upVotes": 0,
"numComments": 0,
"voteNames": [],
"memberId": memberId,
"steamNameId": name
}
})
.then(response => {
return response.data
})
.catch(err => err)
}
}
// this is the boot script that creates the change stream
var es = require('event-stream');
module.exports = function (app) {
console.log('realtime boot script')
var BlogPost = app.models.BlogPost;
BlogPost.createChangeStream(function (err, changes) {
changes.pipe(es.stringify()).pipe(process.stdout);
});
}
// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end
componentDidMount() {
const { dispatch } = this.props;
let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
let src = new EventSource(urlToChangeStream);
src.addEventListener('data', function (msg) {
let data = JSON.parse(msg.data);
dispatch(liveChangeBlogs(data))
});
我期望 'TOPIC_SUBMIT' 操作应该 return 在事件侦听器调度 'liveChangeBlogs' 操作之前完成 这是我在环回事件流 https://loopback.io/doc/en/lb3/Realtime-server-sent-events.html
上找到的文档I am expecting that the 'TOPIC_SUBMIT' action should return fulfilled before the 'liveChangeBlogs' action is dispatched by the event listener
恐怕这是不可能的。即使 LoopBack 服务器在发送对 POST 请求的响应之前拒绝发送 event-stream 条目,它仍然无法保证客户端将接收(并处理!)响应在处理 event-stream 条目之前发送到 POST 请求。
我的建议是跟踪您客户端中的 in-flight 个请求,并删除 event-stream 个由同一客户端所做更改的条目。
大致如下:
const pendingRequests = [];
export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
const data = {
"blogTitle": newTopic,
"blogBody": newTopicBody,
"date": date,
"upVotes": 0,
"numComments": 0,
"voteNames": [],
"memberId": memberId,
"steamNameId": name
};
const entry = {type: 'CREATE', data}
pendingRequests.push(entry);
return {
type: 'TOPIC_SUBMIT',
payload: axios({
method: 'post',
url: `/api/blogPosts`,
data,
})
.then(response => {
return response.data
})
.catch(err => err)
.finally(() => {
// remove the entry from pending requests
const ix = pendingRequests.indexOf(entry);
if (ix > -1) pendingRequests.splice(ix, 1);
})
}
}
// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end
// (...)
src.addEventListener('data', function (msg) {
let data = JSON.parse(msg.data);
const ours = pendingRequests.find(it => {
it.type === data.type && /* check fields that uniquely identify model instance and/or the change being made */
});
if (ours) {
// this event was triggered by us, discard it
return;
}
dispatch(liveChangeBlogs(data))
});
我最终使用 Redux Thunk 解决了这个问题,向我的 componentDidMount 添加了一个 setTimeout 和一个闭包。 topicSubmit 操作和启动脚本没有改变。不确定 setTimeout 是否是正确的方法,但这是我能想到的绕过比赛案例的唯一方法。
componentDidMount() {
const { dispatch } = this.props;
const newBlog = this.handleNewBlog;
let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
let src = new EventSource(urlToChangeStream);
src.addEventListener('data', function (msg) {
newBlog(msg)
});
const newThread = this.handleNewThread;
let urlToChangeStream2 = '/api/threads/change-stream?_format=event-stream';
let src2 = new EventSource(urlToChangeStream2);
src2.addEventListener('data', function (msg) {
newThread(msg)
});
dispatch(getBlogs());
}
handleNewBlog(msg) {
const { dispatch } = this.props;
let data = JSON.parse(msg.data);
if(data.data == undefined) {
setTimeout(() => {
dispatch(getBlogs());
}, 1000);
} else {
setTimeout(() => {
dispatch(liveChangeBlogs(data));
}, 1000);
}
}
handleNewThread(msg) {
const { dispatch, viewingThreadId } = this.props;
let data2 = JSON.parse(msg.data);
console.log('data2: ', data2)
if (data2.type == 'remove') {
return dispatch(getThreadsById(viewingThreadId))
}
let id = data2.data.blogPostId
setTimeout(() => {
if (viewingThreadId === id) {
dispatch(getThreadsById(id));
} else {
return
}
}, 1000);
}