正确使用 Hazelcast EntryProcessor
Correct use of Hazelcast EntryProcessor
我们正在尝试找出在不使用悲观锁定的情况下使用 Hazelcast 的 IMap 的最佳方法。
EntryProcessor 似乎是正确的选择,但是我们需要应用两种不同类型的操作:'create' 当 containsKey 为假时,'update' 当 containsKey 为真时。
如何利用 EntryProcessor 来支持这些逻辑检查?
如果两个线程同时访问 containsKey() 而它 returns 对它们都是 false,我不希望它们都创建密钥。我希望第二个线程应用更新。
这是我们目前所拥有的:
public void put(String key, Object value) {
IMap<String, Object> map = getMap();
if (!map.containsKey(key)) {
// create key here
} else {
// update existing value here
// ...
map.executeOnKey(key, new TransactionEntryProcessor({my_new_value}));
}
}
private static class MyEntryProcessor implements
EntryProcessor<String, Object>, EntryBackupProcessor<String, Object>, Serializable {
private static final long serialVersionUID = // blah blah
private static final ThreadLocal<Object> entryToSet = new ThreadLocal<>();
MyEntryProcessor(Object entryToSet) {
MyEntryProcessor.entryToSet.set(entryToSet);
}
@Override
public Object process(Map.Entry<String, Object> entry) {
entry.setValue(entryToSet.get());
return entry.getValue();
}
@Override
public EntryBackupProcessor<String, Object> getBackupProcessor() {
return MyEntryProcessor.this;
}
@Override
public void processBackup(Map.Entry<String, Object> entry) {
entry.setValue(entryToSet.get());
}
}
可以看到两个线程可以同时进入put方法调用containsKey。第二个将覆盖第一个的结果。
EntryProcessor 根据定义是在条目本身上执行的处理逻辑,消除了 serializing/deserializing 值的需要。在内部,EP 由分区线程执行,其中一个分区线程负责多个分区。当 EP 到达 HC 时,它由密钥所属分区的所有者线程挑选。一旦处理完成,分区线程就准备好接受和执行其他任务(这很可能是由另一个线程提交的相同密钥的相同 EP)。因此,看起来可能是这样,但 EP 不应用作悲观锁定的替代方法。
如果您坚持并非常热衷于为此使用 EP,那么您可以尝试在 process 方法中放置一个空检查。像这样:
public Object process(Map.Entry<String, Object> entry) {
if(null == entry.getValue()) {
entry.setValue("value123");
}
return entry.getValue();
}
这样会发生两件事:
1. 其他线程会等待分区线程再次可用
2. 由于该值已经存在,因此您不会覆盖任何内容
我们正在尝试找出在不使用悲观锁定的情况下使用 Hazelcast 的 IMap 的最佳方法。
EntryProcessor 似乎是正确的选择,但是我们需要应用两种不同类型的操作:'create' 当 containsKey 为假时,'update' 当 containsKey 为真时。
如何利用 EntryProcessor 来支持这些逻辑检查?
如果两个线程同时访问 containsKey() 而它 returns 对它们都是 false,我不希望它们都创建密钥。我希望第二个线程应用更新。
这是我们目前所拥有的:
public void put(String key, Object value) {
IMap<String, Object> map = getMap();
if (!map.containsKey(key)) {
// create key here
} else {
// update existing value here
// ...
map.executeOnKey(key, new TransactionEntryProcessor({my_new_value}));
}
}
private static class MyEntryProcessor implements
EntryProcessor<String, Object>, EntryBackupProcessor<String, Object>, Serializable {
private static final long serialVersionUID = // blah blah
private static final ThreadLocal<Object> entryToSet = new ThreadLocal<>();
MyEntryProcessor(Object entryToSet) {
MyEntryProcessor.entryToSet.set(entryToSet);
}
@Override
public Object process(Map.Entry<String, Object> entry) {
entry.setValue(entryToSet.get());
return entry.getValue();
}
@Override
public EntryBackupProcessor<String, Object> getBackupProcessor() {
return MyEntryProcessor.this;
}
@Override
public void processBackup(Map.Entry<String, Object> entry) {
entry.setValue(entryToSet.get());
}
}
可以看到两个线程可以同时进入put方法调用containsKey。第二个将覆盖第一个的结果。
EntryProcessor 根据定义是在条目本身上执行的处理逻辑,消除了 serializing/deserializing 值的需要。在内部,EP 由分区线程执行,其中一个分区线程负责多个分区。当 EP 到达 HC 时,它由密钥所属分区的所有者线程挑选。一旦处理完成,分区线程就准备好接受和执行其他任务(这很可能是由另一个线程提交的相同密钥的相同 EP)。因此,看起来可能是这样,但 EP 不应用作悲观锁定的替代方法。
如果您坚持并非常热衷于为此使用 EP,那么您可以尝试在 process 方法中放置一个空检查。像这样:
public Object process(Map.Entry<String, Object> entry) {
if(null == entry.getValue()) {
entry.setValue("value123");
}
return entry.getValue();
}
这样会发生两件事: 1. 其他线程会等待分区线程再次可用 2. 由于该值已经存在,因此您不会覆盖任何内容