Hazelcast/Coherence 具有每个键数据的网格计算 EntryProcessor

Hazelcast/Coherence Grid Computing EntryProcessor with data for each key

我想使用 hazelcast 或 coherence EntryProcessor 在不同节点上并行执行某些逻辑,其中密钥存储在缓存中。我看到我可以使用类似 sendToEachKey(EntryProcessor 进程)的东西。

当我还需要用逻辑发送一段数据来处理时,我的问题就来了,该数据也属于另一个系统并且我收到了它(例如在 http 请求中)。

当然我可以做类似 sendToEachKey(EntryProcessor(data) process) 的事情。但是,如果每个键的数据都不同,而我只想将他的数据发送到特定键进行处理,我该怎么做?为什么我要这样做是因为数据太大而且我有网络过载。

当然,如果我打开一个线程池将每个数据发送到每个键是可能的,但由于请求量很大,效率很低。

谢谢!

在 Hazelcast 中你会做 executeOnKeys(keys, new EntryProcessor(data)),这太多了,因为数据太大了。

为什么不

executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));

发送每个键需要的数据子集?

对于 Hazelcast,您可以检索所有值并发送它自己的每个键 EntryProcessor,但是这会产生大量开销。

另一种选择是结合使用 EntryProcessor 和我们的分布式 ExecutorService

您将 Runnable 发送到 ExecutorService。在 Runnable 中,您检索本地键集,检索所有外部值(所有已经在节点本地的值),然后为每个本地键发出一个 EntryProcessor。由于您已经在节点本地,因此不再有流量四处流动(除了备份,显然 :))。也就是说,您可能想要实现一个特定的 EntryProcessor,它只传输更改后的值而不传输完整的处理器本身(以节省更多流量)。

在 Coherence 中,您可以使用 PartitionedService 来查找缓存键与集群成员的关联。然后您可以使用每个成员的数据调用入口处理器,使用 PartitionedFilter 确保数据仅发送给该成员。像这样:

// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) { 
    PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
    Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);

    for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
        Member member = dataForMember.getKey();
        Map<String, Data> data = dataForMember.getValue();

        PartitionSet partitions = partitionedService.getOwnedPartitions(member);
        PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
        EntryProcessor processor = new MyEntryProcessor(data);
        cache.async().invokeAll(filter, processor);
    }
}

Map<Member, Map<String, Data>> splitDataByMembers(
        PartitionedService partitionedService,
        Map<String, Data> externalData) {
    Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();

    for (Object member : partitionedService.getInfo().getServiceMembers()) {
        dataForMembers.put((Member) member, new HashMap<>());
    }
    for (Entry<String, Data> dataForKey : externalData.entrySet()) {
        Member member = partitionedService.getKeyOwner(dataForKey.getKey());
        dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
    }
    return dataForMembers;
}

这样一来,集群中的每个成员将只有一个入口处理器调用,每个成员将只获得它感兴趣的数据。

我使用 String 作为缓存键和任意 Data 类型作为与此键关联的数据,但您当然可以使用任何其他类型(并且您不必为外部建模数据完全作为地图)。