Hazelcast/Coherence 具有每个键数据的网格计算 EntryProcessor
Hazelcast/Coherence Grid Computing EntryProcessor with data for each key
我想使用 hazelcast 或 coherence EntryProcessor 在不同节点上并行执行某些逻辑,其中密钥存储在缓存中。我看到我可以使用类似 sendToEachKey(EntryProcessor 进程)的东西。
当我还需要用逻辑发送一段数据来处理时,我的问题就来了,该数据也属于另一个系统并且我收到了它(例如在 http 请求中)。
当然我可以做类似 sendToEachKey(EntryProcessor(data) process) 的事情。但是,如果每个键的数据都不同,而我只想将他的数据发送到特定键进行处理,我该怎么做?为什么我要这样做是因为数据太大而且我有网络过载。
当然,如果我打开一个线程池将每个数据发送到每个键是可能的,但由于请求量很大,效率很低。
谢谢!
在 Hazelcast 中你会做 executeOnKeys(keys, new EntryProcessor(data))
,这太多了,因为数据太大了。
为什么不
executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));
发送每个键需要的数据子集?
对于 Hazelcast,您可以检索所有值并发送它自己的每个键 EntryProcessor
,但是这会产生大量开销。
另一种选择是结合使用 EntryProcessor
和我们的分布式 ExecutorService
。
您将 Runnable
发送到 ExecutorService。在 Runnable
中,您检索本地键集,检索所有外部值(所有已经在节点本地的值),然后为每个本地键发出一个 EntryProcessor
。由于您已经在节点本地,因此不再有流量四处流动(除了备份,显然 :))。也就是说,您可能想要实现一个特定的 EntryProcessor
,它只传输更改后的值而不传输完整的处理器本身(以节省更多流量)。
在 Coherence 中,您可以使用 PartitionedService
来查找缓存键与集群成员的关联。然后您可以使用每个成员的数据调用入口处理器,使用 PartitionedFilter
确保数据仅发送给该成员。像这样:
// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) {
PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);
for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
Member member = dataForMember.getKey();
Map<String, Data> data = dataForMember.getValue();
PartitionSet partitions = partitionedService.getOwnedPartitions(member);
PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
EntryProcessor processor = new MyEntryProcessor(data);
cache.async().invokeAll(filter, processor);
}
}
Map<Member, Map<String, Data>> splitDataByMembers(
PartitionedService partitionedService,
Map<String, Data> externalData) {
Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();
for (Object member : partitionedService.getInfo().getServiceMembers()) {
dataForMembers.put((Member) member, new HashMap<>());
}
for (Entry<String, Data> dataForKey : externalData.entrySet()) {
Member member = partitionedService.getKeyOwner(dataForKey.getKey());
dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
}
return dataForMembers;
}
这样一来,集群中的每个成员将只有一个入口处理器调用,每个成员将只获得它感兴趣的数据。
我使用 String
作为缓存键和任意 Data
类型作为与此键关联的数据,但您当然可以使用任何其他类型(并且您不必为外部建模数据完全作为地图)。
我想使用 hazelcast 或 coherence EntryProcessor 在不同节点上并行执行某些逻辑,其中密钥存储在缓存中。我看到我可以使用类似 sendToEachKey(EntryProcessor 进程)的东西。
当我还需要用逻辑发送一段数据来处理时,我的问题就来了,该数据也属于另一个系统并且我收到了它(例如在 http 请求中)。
当然我可以做类似 sendToEachKey(EntryProcessor(data) process) 的事情。但是,如果每个键的数据都不同,而我只想将他的数据发送到特定键进行处理,我该怎么做?为什么我要这样做是因为数据太大而且我有网络过载。
当然,如果我打开一个线程池将每个数据发送到每个键是可能的,但由于请求量很大,效率很低。
谢谢!
在 Hazelcast 中你会做 executeOnKeys(keys, new EntryProcessor(data))
,这太多了,因为数据太大了。
为什么不
executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));
发送每个键需要的数据子集?
对于 Hazelcast,您可以检索所有值并发送它自己的每个键 EntryProcessor
,但是这会产生大量开销。
另一种选择是结合使用 EntryProcessor
和我们的分布式 ExecutorService
。
您将 Runnable
发送到 ExecutorService。在 Runnable
中,您检索本地键集,检索所有外部值(所有已经在节点本地的值),然后为每个本地键发出一个 EntryProcessor
。由于您已经在节点本地,因此不再有流量四处流动(除了备份,显然 :))。也就是说,您可能想要实现一个特定的 EntryProcessor
,它只传输更改后的值而不传输完整的处理器本身(以节省更多流量)。
在 Coherence 中,您可以使用 PartitionedService
来查找缓存键与集群成员的关联。然后您可以使用每个成员的数据调用入口处理器,使用 PartitionedFilter
确保数据仅发送给该成员。像这样:
// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) {
PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);
for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
Member member = dataForMember.getKey();
Map<String, Data> data = dataForMember.getValue();
PartitionSet partitions = partitionedService.getOwnedPartitions(member);
PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
EntryProcessor processor = new MyEntryProcessor(data);
cache.async().invokeAll(filter, processor);
}
}
Map<Member, Map<String, Data>> splitDataByMembers(
PartitionedService partitionedService,
Map<String, Data> externalData) {
Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();
for (Object member : partitionedService.getInfo().getServiceMembers()) {
dataForMembers.put((Member) member, new HashMap<>());
}
for (Entry<String, Data> dataForKey : externalData.entrySet()) {
Member member = partitionedService.getKeyOwner(dataForKey.getKey());
dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
}
return dataForMembers;
}
这样一来,集群中的每个成员将只有一个入口处理器调用,每个成员将只获得它感兴趣的数据。
我使用 String
作为缓存键和任意 Data
类型作为与此键关联的数据,但您当然可以使用任何其他类型(并且您不必为外部建模数据完全作为地图)。