来自不同调度作业的相同列版本

Same column edition from different scheduling jobs

我有一个 JSON 对象存储在 table > column 上。 JSON 对象的示例是:

{
  a :{
    data: [1, 2, 3]
  },
  b :{
    data: [4, 5, 6]
  }
}

而且我有用于在 a.data (named scheduler A)b.data (named scheduler B) 上附加值的调度程序。调度程序正在研究从 table > column 获取 JSON 值并附加内容的方式。

问题:

这里,scheduler Ascheduler B没有任何同步机制。因此,当 scheduler Ascheduler B 附加值事件同时发生时,scheduler A 输出将被 scheduler B 输出覆盖,反之亦然。

处理该同步的机制是什么?由于 JSON 值的行为是动态的,我也无法在列上拆分 JSON 对象,所以我必须处理 JSON 格式。

对于最简单的情况,您可以同步数据库访问(例如,为 reading/writing 数据库设置一个专用对象,在 accessing/modifying json 数据的调度程序中使用此对象并在此对象上同步在调度程序中)。

像这样:

MockDB class

public class MockDB {

    private JSONObject json;

    public MockDB() {
        this.json = fillJSON();
    }

    // fill JSON object with test data
    private JSONObject fillJSON() {
        JSONObject json = new JSONObject();

        JSONObject map = new JSONObject();
        map.put("data", Arrays.asList(1));
        json.put("a", map);

        map = new JSONObject();
        map.put("data", Arrays.asList(11));
        json.put("b", map);

        return json;
    }

    public JSONObject getJSON() {
        return cloneJson(json);
    }

    public void setJSON(JSONObject newJson) {
        this.json = cloneJson(newJson);
    }

    // make a deep copy of JSON object
    private JSONObject cloneJson(JSONObject jsonObj) {
        JSONObject newJson = new JSONObject();
        for(Object key : jsonObj.keySet()) {
            if (jsonObj.get(key) instanceof JSONObject) {
                newJson.put(key, cloneJson((JSONObject) jsonObj.get(key)));
            } else if (jsonObj.get(key) instanceof JSONArray) {
                newJson.put(key, ((JSONArray)jsonObj.get(key)).clone());
            } else {
                newJson.put(key, jsonObj.get(key));
            }
        }
        return newJson;
    }
}

字段更新器class

public class ScheduledUpdater implements Runnable {

    private final MockDB database;
    private final String field;

    public ScheduledUpdater(MockDB database, String field) {
        this.database = database;
        this.field = field;
    }

    @Override
    public void run() {
        // here we should synchronize on a whole DB access object 
        // as we need get & set to be atomic together
        JSONObject json;
        synchronized (database) {
            json = database.getJSON();

            JSONObject xData;
            xData = (JSONObject) json.get(field);
            Object obj = xData.get("data");
            List<Integer> array = new ArrayList<>((List<Integer>) obj);
            array.add(Collections.max(array) + 1); // add new item to json array
            xData.put("data", array);

            database.setJSON(json);
        }
        printValues(json);
    }

    private void printValues(JSONObject json) {
        JSONObject    ao = (JSONObject) json.get("a");
        List<Integer> ad = (List<Integer>) ao.get("data");
        JSONObject    bo = (JSONObject) json.get("b");
        List<Integer> bd = (List<Integer>) bo.get("data");
        System.out.println(String.format("a: %02d; b: %02d", Collections.max(ad), Collections.max(bd)));
    }

}

实际执行人

    public static void main(String [] args) throws InterruptedException {

        MockDB database = new MockDB();

        System.out.println("starting tasks\n");

        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        executor.scheduleAtFixedRate(new ScheduledUpdater(database, "a"), 0, 1, TimeUnit.SECONDS);
        executor.scheduleAtFixedRate(new ScheduledUpdater(database, "b"), 0, 1, TimeUnit.SECONDS);

        // run test for 5 seconds
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();

        // let all threads to stop
        Thread.sleep(250);
        System.out.println("\ntasks stopped; json: " + database.getJSON());
    }

PS:此外,您还可以从 java.util.concurrent 包中检查同步原语。