由于竞争条件,MobX 动作计划/执行顺序未保留
MobX action schedule / execution order isn't preserved due to race condition
这里有一个 运行 示例,说明我目前所获得的内容:
https://codesandbox.io/s/github/BruceL33t/mobx-action-synchronous-execution-order/tree/master/
store.js:
import { observable, action } from "mobx";
import Sensor from "../models/Sensor";
export default class RootStore {
@observable sensors = new Map();
constructor() {
let self = this;
const sensorIds = [
"sensor1",
"sensor2",
"sensor3",
"sensor4",
"sensor5",
"sensor6",
"sensor7",
"sensor8",
"sensor9",
"sensor10"
];
for (let sensor of sensorIds) {
self.sensors.set(sensor, new Sensor(5));
}
// setInterval simulates some incoming data (originally from SignalR, and roughly each second)
setInterval(function() {
let out = {};
const x = +new Date(); // unix timestamp
for (let sensor of sensorIds) {
const y = Math.floor(Math.random() * 10000) + 1;
const m = { x: x, y: y };
out[sensor] = m;
}
self.addMeasurement(out); // the problem starts here.
}, 1000);
}
// the problem!
@action
addMeasurement(sensorMeasurementMap) {
let self = this;
// this timeout is to try and simulate a race condition
// since each measurement is incoming each second,
// here some of them will take as long as 6 seconds to add,
// due to the timeout.
// the point is that they should always be added,
// in the order they were called in.
// so if the first measurement takes 20 seconds to be added,
// the next measurements that were received on 2, 3, 4, 5..., 19th second etc,
// should all "wait" for the prev measurement, so they're added
// in the right order (order can be checked by timestamp, x)
setTimeout(() => {
const keys = self.sensors.keys();
if (keys.length === 0) {
// never really gonna happen, since we already set them above
} else {
for (const key in sensorMeasurementMap) {
if (self.sensors.keys().indexOf(key) > -1) {
self.sensors.get(key).add(sensorMeasurementMap[key]);
} else {
// also not gonna happen in this example
}
}
}
}, Math.floor(Math.random() * 20 + 1) * 1000);
}
}
Sensor.js:
import Queue from './Queue';
import {observable, action} from 'mobx';
export default class Sensor {
@observable queue;
constructor(n) {
this.n = n;
this.queue = new Queue(this.n);
}
@action add(measurement) {
this.queue.add(measurement);
}
}
Queue.js:
import {observable, action} from 'mobx';
export default class Queue {
@observable data;
constructor(maxSize) {
this.maxSize = maxSize;
this.size = 0;
this.data = [];
}
@action add(measurement) {
let removedItem = undefined;
if(this.size >= this.maxSize) {
let temp = this.data[0];
removedItem = temp && temp.y ? temp.y+'' : undefined;
this.data.shift();
}
this.data.push(measurement);
if (removedItem === undefined && this.size < this.maxSize) {
this.size++;
}
return removedItem;
}
}
代码中有一些注释,但您绝对需要查看输出 https://codesandbox.io/s/github/BruceL33t/mobx-action-synchronous-execution-order/tree/master/ 才能理解它。
这里我也试着解释一下,这是怎么回事。
这基本上是真实应用程序的一部分的过度简化版本,其中 setInterval 只是用来模拟 SignalR 事件处理程序以指示每秒传入的数据。传入数据是我们在 addMeasurement 操作上方的 setInterval 函数中创建的。
鉴于每秒接收到一些传入数据,我们希望将其添加到商店中的可观察地图传感器中。由于此数据用于在实际应用程序中绘制图表,因此我们需要确保它确实按照调用操作的顺序添加 - 无论操作需要多长时间才能完成。
在实际应用中,我看到数据被推送到 MobX 状态的顺序有些不一致,所以我将其隔离并将相关部分提取到这个示例中,并试图通过使用 setTimeout 夸大它一点func 在 addMeasurement 操作中。
由于每个数据都是每秒获取的,但有些测量可能需要长达 20 秒才能获取(这不现实,但为了清楚地显示竞争条件问题),就像现在的代码一样,经常发生我们结束想出类似的东西:
[
{"x":1519637083193,"y":4411},
{"x":1519637080192,"y":7562},
{"x":1519637084193,"y":1269},
{"x":1519637085192,"y":8916},
{"x":1519637081192,"y":7365}
]
这真的不应该发生,因为 1519637083193 比 1519637080192.
greater/later
当根据这些数据绘制图表并在之后对其进行排序时,这是一个真正的问题太昂贵了,所以我正在寻找一种方法来改进这段代码,这样我们就可以相信每个 addMeasurement 只会在上一个操作后被触发已经完全结束了。或者至少是一种以正确顺序更新 MobX 状态的方法
希望它有意义。
should all "wait" for the prev measurement, so they're added in the right order (order can be checked by timestamp, x).
你能详细说明一下吗?人们怎么知道将来不会收到比当前时间戳更大的时间戳,因此会无限期地等待?您正在寻找的不只是对测量数组的排序插入(而不是等待)吗?
如果排序插入不能解决问题,我可能会执行以下操作(未经测试):
lastAddition = Promise.resolve() // start with already finishied addition
addMeasurement(sensorMeasurementMap) {
this.lastAddition = this.lastAddition.then(() => {
return new Promise((resolve, reject) => {
setTimeout(action(() => {
const keys = self.sensors.keys();
if (keys.length === 0) {
// never really gonna happen, since we already set them above
} else {
for (const key in sensorMeasurementMap) {
if (self.sensors.keys().indexOf(key) > -1) {
self.sensors.get(key).add(sensorMeasurementMap[key]);
} else {
// also not gonna happen in this example
}
}
}
resolve()
}), Math.floor(Math.random() * 20 + 1) * 1000);
})
})
}
}
N.B.: 请注意,我把 action
移到了里面,因为你需要它在你实际修改状态的地方,而不是调度发生的地方
这里有一个 运行 示例,说明我目前所获得的内容:
https://codesandbox.io/s/github/BruceL33t/mobx-action-synchronous-execution-order/tree/master/
store.js:
import { observable, action } from "mobx";
import Sensor from "../models/Sensor";
export default class RootStore {
@observable sensors = new Map();
constructor() {
let self = this;
const sensorIds = [
"sensor1",
"sensor2",
"sensor3",
"sensor4",
"sensor5",
"sensor6",
"sensor7",
"sensor8",
"sensor9",
"sensor10"
];
for (let sensor of sensorIds) {
self.sensors.set(sensor, new Sensor(5));
}
// setInterval simulates some incoming data (originally from SignalR, and roughly each second)
setInterval(function() {
let out = {};
const x = +new Date(); // unix timestamp
for (let sensor of sensorIds) {
const y = Math.floor(Math.random() * 10000) + 1;
const m = { x: x, y: y };
out[sensor] = m;
}
self.addMeasurement(out); // the problem starts here.
}, 1000);
}
// the problem!
@action
addMeasurement(sensorMeasurementMap) {
let self = this;
// this timeout is to try and simulate a race condition
// since each measurement is incoming each second,
// here some of them will take as long as 6 seconds to add,
// due to the timeout.
// the point is that they should always be added,
// in the order they were called in.
// so if the first measurement takes 20 seconds to be added,
// the next measurements that were received on 2, 3, 4, 5..., 19th second etc,
// should all "wait" for the prev measurement, so they're added
// in the right order (order can be checked by timestamp, x)
setTimeout(() => {
const keys = self.sensors.keys();
if (keys.length === 0) {
// never really gonna happen, since we already set them above
} else {
for (const key in sensorMeasurementMap) {
if (self.sensors.keys().indexOf(key) > -1) {
self.sensors.get(key).add(sensorMeasurementMap[key]);
} else {
// also not gonna happen in this example
}
}
}
}, Math.floor(Math.random() * 20 + 1) * 1000);
}
}
Sensor.js:
import Queue from './Queue';
import {observable, action} from 'mobx';
export default class Sensor {
@observable queue;
constructor(n) {
this.n = n;
this.queue = new Queue(this.n);
}
@action add(measurement) {
this.queue.add(measurement);
}
}
Queue.js:
import {observable, action} from 'mobx';
export default class Queue {
@observable data;
constructor(maxSize) {
this.maxSize = maxSize;
this.size = 0;
this.data = [];
}
@action add(measurement) {
let removedItem = undefined;
if(this.size >= this.maxSize) {
let temp = this.data[0];
removedItem = temp && temp.y ? temp.y+'' : undefined;
this.data.shift();
}
this.data.push(measurement);
if (removedItem === undefined && this.size < this.maxSize) {
this.size++;
}
return removedItem;
}
}
代码中有一些注释,但您绝对需要查看输出 https://codesandbox.io/s/github/BruceL33t/mobx-action-synchronous-execution-order/tree/master/ 才能理解它。
这里我也试着解释一下,这是怎么回事。
这基本上是真实应用程序的一部分的过度简化版本,其中 setInterval 只是用来模拟 SignalR 事件处理程序以指示每秒传入的数据。传入数据是我们在 addMeasurement 操作上方的 setInterval 函数中创建的。
鉴于每秒接收到一些传入数据,我们希望将其添加到商店中的可观察地图传感器中。由于此数据用于在实际应用程序中绘制图表,因此我们需要确保它确实按照调用操作的顺序添加 - 无论操作需要多长时间才能完成。
在实际应用中,我看到数据被推送到 MobX 状态的顺序有些不一致,所以我将其隔离并将相关部分提取到这个示例中,并试图通过使用 setTimeout 夸大它一点func 在 addMeasurement 操作中。
由于每个数据都是每秒获取的,但有些测量可能需要长达 20 秒才能获取(这不现实,但为了清楚地显示竞争条件问题),就像现在的代码一样,经常发生我们结束想出类似的东西:
[
{"x":1519637083193,"y":4411},
{"x":1519637080192,"y":7562},
{"x":1519637084193,"y":1269},
{"x":1519637085192,"y":8916},
{"x":1519637081192,"y":7365}
]
这真的不应该发生,因为 1519637083193 比 1519637080192.
greater/later当根据这些数据绘制图表并在之后对其进行排序时,这是一个真正的问题太昂贵了,所以我正在寻找一种方法来改进这段代码,这样我们就可以相信每个 addMeasurement 只会在上一个操作后被触发已经完全结束了。或者至少是一种以正确顺序更新 MobX 状态的方法
希望它有意义。
should all "wait" for the prev measurement, so they're added in the right order (order can be checked by timestamp, x).
你能详细说明一下吗?人们怎么知道将来不会收到比当前时间戳更大的时间戳,因此会无限期地等待?您正在寻找的不只是对测量数组的排序插入(而不是等待)吗?
如果排序插入不能解决问题,我可能会执行以下操作(未经测试):
lastAddition = Promise.resolve() // start with already finishied addition
addMeasurement(sensorMeasurementMap) {
this.lastAddition = this.lastAddition.then(() => {
return new Promise((resolve, reject) => {
setTimeout(action(() => {
const keys = self.sensors.keys();
if (keys.length === 0) {
// never really gonna happen, since we already set them above
} else {
for (const key in sensorMeasurementMap) {
if (self.sensors.keys().indexOf(key) > -1) {
self.sensors.get(key).add(sensorMeasurementMap[key]);
} else {
// also not gonna happen in this example
}
}
}
resolve()
}), Math.floor(Math.random() * 20 + 1) * 1000);
})
})
}
}
N.B.: 请注意,我把 action
移到了里面,因为你需要它在你实际修改状态的地方,而不是调度发生的地方