正确使用 Hazelcast EntryProcessor

Correct use of Hazelcast EntryProcessor

我们正在尝试找出在不使用悲观锁定的情况下使用 Hazelcast 的 IMap 的最佳方法。

EntryProcessor 似乎是正确的选择,但是我们需要应用两种不同类型的操作:'create' 当 containsKey 为假时,'update' 当 containsKey 为真时。

如何利用 EntryProcessor 来支持这些逻辑检查?

如果两个线程同时访问 containsKey() 而它 returns 对它们都是 false,我不希望它们都创建密钥。我希望第二个线程应用更新。

这是我们目前所拥有的:

public void put(String key, Object value) {

    IMap<String, Object> map = getMap();

    if (!map.containsKey(key)) {
           // create key here
    } else {
        // update existing value here
        // ...
        map.executeOnKey(key, new TransactionEntryProcessor({my_new_value}));
    }
}

private static class MyEntryProcessor implements
        EntryProcessor<String, Object>, EntryBackupProcessor<String, Object>, Serializable {

    private static final long serialVersionUID = // blah blah

    private static final ThreadLocal<Object> entryToSet = new ThreadLocal<>();

    MyEntryProcessor(Object entryToSet) {
        MyEntryProcessor.entryToSet.set(entryToSet);
    }

    @Override
    public Object process(Map.Entry<String, Object> entry) {
        entry.setValue(entryToSet.get());
        return entry.getValue();
    }

    @Override
    public EntryBackupProcessor<String, Object> getBackupProcessor() {
        return MyEntryProcessor.this;
    }

    @Override
    public void processBackup(Map.Entry<String, Object> entry) {
        entry.setValue(entryToSet.get());
    }
}

可以看到两个线程可以同时进入put方法调用containsKey。第二个将覆盖第一个的结果。

EntryProcessor 根据定义是在条目本身上执行的处理逻辑,消除了 serializing/deserializing 值的需要。在内部,EP 由分区线程执行,其中一个分区线程负责多个分区。当 EP 到达 HC 时,它由密钥所属分区的所有者线程挑选。一旦处理完成,分区线程就准备好接受和执行其他任务(这很可能是由另一个线程提交的相同密钥的相同 EP)。因此,看起来可能是这样,但 EP 不应用作悲观锁定的替代方法。

如果您坚持并非常热衷于为此使用 EP,那么您可以尝试在 process 方法中放置一个空检查。像这样:

    public Object process(Map.Entry<String, Object> entry) {
        if(null == entry.getValue()) {
            entry.setValue("value123");
        }
        return entry.getValue();
    }

这样会发生两件事: 1. 其他线程会等待分区线程再次可用 2. 由于该值已经存在,因此您不会覆盖任何内容