在 REST/WebFlux 上公开整个 IMap 而不会过度使用堆

Expose entire IMap over REST/WebFlux without excessive Heap utilisation

我有一个可能非常大的分布式 Hazelcast 地图 (IMap)。

我的任务是 return 将整个地图 values() 集合作为对 HTTP GET 请求的响应。

为了最大限度地减少堆利用率,我计划使用 Spring WebFlux 和 return Flux 实例。

我担心的是 IMap#values().iterator().next() 的调用隐含在 Flux.fromIterable() 中,可能 反序列化所有来自所有集群成员的值,从而破坏了 JVM 的堆,JVM 是服务于 GET 请求的 Hazelcast 客户端。

如果这种担忧是有根据的,那么:

Hazelcast Jet 会提供解决方案吗?我可以创建一个 Pipeline.withSource(IMap),但是我如何将接收器创建为可以 returned 的 Flux 实例?

非常感谢,罗宾。

这种担忧是有道理的。实际上有一个查询大小限制(参见 here),对于大地图,values() 调用将失败。

Jet 在 request-response 场景中没有用处:它可以以流方式处理大型地图,但它会将地图条目传送到接收器而不是调用方。您也许可以破解它,但这并不简单。

在即将推出的 Hazelcast 4.1 中,将有 SQL API 最适合您的用例:如果您使用 SQL 查询地图,甚至可以流式传输大量结果没有持续内存使用的客户端。

作为解决方法,您可以查看 Jet 地图 reader 的支持代码:ReadMapOrCacheP.java, it uses an internal API 以增量读取地图。但它是一个不受支持的内部 API,每个版本都可以 changed/removed。

作为对 Oliv 答案的补充,IMDG 4.0 中针对 IMap 内容引入了一个内部迭代器实现,它不会检索整个地图内容,并且在面对并发突变和故障时应该能正常工作。

仍然,使用它有点复杂,因为我们还没有通过 public API 公开它。对于 javadoc 和 member-side 实现,请参阅 here.

示例代码:

public static void main(String[] args) {
        HazelcastInstance i1 = Hazelcast.newHazelcastInstance();
        HazelcastInstance i2 = Hazelcast.newHazelcastInstance();
        HazelcastInstance i3 = Hazelcast.newHazelcastInstance();
        IMap<Integer, Integer> sampleMap = i1.getMap("map");

        // ingest
        for (int i = 0; i < 100; i++) {
            sampleMap.put(i, i);
        }

        // read partition by partition
        for (Partition partition : i1.getPartitionService().getPartitions()) {
            int fetchSize = 10;
            boolean prefetchValues = true;

            MapProxyImpl mapImpl = (MapProxyImpl) sampleMap;
            // in case you're reading from the client, use:
            // ClientMapProxy mapImpl = (ClientMapProxy) sampleMap;
            Iterator<Entry<String, String>> partitionIterator =
                    mapImpl.iterator(fetchSize, partition.getPartitionId(), prefetchValues);

            while (partitionIterator.hasNext()) {
                Entry<String, String> next = partitionIterator.next();
                System.out.println("Fetched entry " + next);
            }
        }
    }