在 REST/WebFlux 上公开整个 IMap 而不会过度使用堆
Expose entire IMap over REST/WebFlux without excessive Heap utilisation
我有一个可能非常大的分布式 Hazelcast 地图 (IMap)。
我的任务是 return 将整个地图 values() 集合作为对 HTTP GET 请求的响应。
为了最大限度地减少堆利用率,我计划使用 Spring WebFlux 和 return Flux 实例。
我担心的是 IMap#values().iterator().next() 的调用隐含在 Flux.fromIterable() 中,可能 反序列化所有来自所有集群成员的值,从而破坏了 JVM 的堆,JVM 是服务于 GET 请求的 Hazelcast 客户端。
如果这种担忧是有根据的,那么:
Hazelcast Jet 会提供解决方案吗?我可以创建一个 Pipeline.withSource(IMap),但是我如何将接收器创建为可以 returned 的 Flux 实例?
非常感谢,罗宾。
这种担忧是有道理的。实际上有一个查询大小限制(参见 here),对于大地图,values()
调用将失败。
Jet 在 request-response 场景中没有用处:它可以以流方式处理大型地图,但它会将地图条目传送到接收器而不是调用方。您也许可以破解它,但这并不简单。
在即将推出的 Hazelcast 4.1 中,将有 SQL API 最适合您的用例:如果您使用 SQL 查询地图,甚至可以流式传输大量结果没有持续内存使用的客户端。
作为解决方法,您可以查看 Jet 地图 reader 的支持代码:ReadMapOrCacheP.java, it uses an internal API 以增量读取地图。但它是一个不受支持的内部 API,每个版本都可以 changed/removed。
作为对 Oliv 答案的补充,IMDG 4.0 中针对 IMap 内容引入了一个内部迭代器实现,它不会检索整个地图内容,并且在面对并发突变和故障时应该能正常工作。
仍然,使用它有点复杂,因为我们还没有通过 public API 公开它。对于 javadoc 和 member-side 实现,请参阅 here.
示例代码:
public static void main(String[] args) {
HazelcastInstance i1 = Hazelcast.newHazelcastInstance();
HazelcastInstance i2 = Hazelcast.newHazelcastInstance();
HazelcastInstance i3 = Hazelcast.newHazelcastInstance();
IMap<Integer, Integer> sampleMap = i1.getMap("map");
// ingest
for (int i = 0; i < 100; i++) {
sampleMap.put(i, i);
}
// read partition by partition
for (Partition partition : i1.getPartitionService().getPartitions()) {
int fetchSize = 10;
boolean prefetchValues = true;
MapProxyImpl mapImpl = (MapProxyImpl) sampleMap;
// in case you're reading from the client, use:
// ClientMapProxy mapImpl = (ClientMapProxy) sampleMap;
Iterator<Entry<String, String>> partitionIterator =
mapImpl.iterator(fetchSize, partition.getPartitionId(), prefetchValues);
while (partitionIterator.hasNext()) {
Entry<String, String> next = partitionIterator.next();
System.out.println("Fetched entry " + next);
}
}
}
我有一个可能非常大的分布式 Hazelcast 地图 (IMap)。
我的任务是 return 将整个地图 values() 集合作为对 HTTP GET 请求的响应。
为了最大限度地减少堆利用率,我计划使用 Spring WebFlux 和 return Flux 实例。
我担心的是 IMap#values().iterator().next() 的调用隐含在 Flux.fromIterable() 中,可能 反序列化所有来自所有集群成员的值,从而破坏了 JVM 的堆,JVM 是服务于 GET 请求的 Hazelcast 客户端。
如果这种担忧是有根据的,那么:
Hazelcast Jet 会提供解决方案吗?我可以创建一个 Pipeline.withSource(IMap),但是我如何将接收器创建为可以 returned 的 Flux 实例?
非常感谢,罗宾。
这种担忧是有道理的。实际上有一个查询大小限制(参见 here),对于大地图,values()
调用将失败。
Jet 在 request-response 场景中没有用处:它可以以流方式处理大型地图,但它会将地图条目传送到接收器而不是调用方。您也许可以破解它,但这并不简单。
在即将推出的 Hazelcast 4.1 中,将有 SQL API 最适合您的用例:如果您使用 SQL 查询地图,甚至可以流式传输大量结果没有持续内存使用的客户端。
作为解决方法,您可以查看 Jet 地图 reader 的支持代码:ReadMapOrCacheP.java, it uses an internal API 以增量读取地图。但它是一个不受支持的内部 API,每个版本都可以 changed/removed。
作为对 Oliv 答案的补充,IMDG 4.0 中针对 IMap 内容引入了一个内部迭代器实现,它不会检索整个地图内容,并且在面对并发突变和故障时应该能正常工作。
仍然,使用它有点复杂,因为我们还没有通过 public API 公开它。对于 javadoc 和 member-side 实现,请参阅 here.
示例代码:
public static void main(String[] args) {
HazelcastInstance i1 = Hazelcast.newHazelcastInstance();
HazelcastInstance i2 = Hazelcast.newHazelcastInstance();
HazelcastInstance i3 = Hazelcast.newHazelcastInstance();
IMap<Integer, Integer> sampleMap = i1.getMap("map");
// ingest
for (int i = 0; i < 100; i++) {
sampleMap.put(i, i);
}
// read partition by partition
for (Partition partition : i1.getPartitionService().getPartitions()) {
int fetchSize = 10;
boolean prefetchValues = true;
MapProxyImpl mapImpl = (MapProxyImpl) sampleMap;
// in case you're reading from the client, use:
// ClientMapProxy mapImpl = (ClientMapProxy) sampleMap;
Iterator<Entry<String, String>> partitionIterator =
mapImpl.iterator(fetchSize, partition.getPartitionId(), prefetchValues);
while (partitionIterator.hasNext()) {
Entry<String, String> next = partitionIterator.next();
System.out.println("Fetched entry " + next);
}
}
}