Hazelcast 聚合 API 导致带有谓词的 ClassCastException

Hazelcast Aggregations API results in ClassCastException with Predicates

我正在使用 Hazelcast IMap 实例来保存如下对象:

public class Report implements Portable, Comparable<Report>, Serializable 
{
    private String id;
    private String name;
    private String sourceId;
    private Date timestamp;
    private Map<String,Object> payload;
  // ...
}

IMapid 键控,我还在 sourceId 上创建了索引,因为我需要根据该字段进行查询和聚合。

IMap<String, Report> reportMap = hazelcast.getMap("reports");
reportMap.addIndex("sourceId", false);

我一直在尝试使用聚合框架来计算 sourceId 的报告。 尝试 #1:

  public static int reportCountforSource(String sourceId) 
  {
      EntryObject e = new PredicateBuilder().getEntryObject();
      Predicate<String, Report> predicate = e.get("sourceId").equal(sourceId);
      Supplier<String, Report, Object> supplier = Supplier.fromPredicate(predicate);
      Long count = reportMap.aggregate(supplier, Aggregations.count());

      return count.intValue();
  }    

这导致 ClassCastException 被聚合框架抛出:

Caused by: java.lang.ClassCastException: com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper$SimpleEntry cannot be cast to com.hazelcast.query.impl.QueryableEntry
    at com.hazelcast.query.Predicates$AbstractPredicate.readAttribute(Predicates.java:859) 
    at com.hazelcast.query.Predicates$EqualPredicate.apply(Predicates.java:779) 
    at com.hazelcast.mapreduce.aggregation.impl.PredicateSupplier.apply(PredicateSupplier.java:58) 
    at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.map(SupplierConsumingMapper.java:55)
    at com.hazelcast.mapreduce.impl.task.KeyValueSourceMappingPhase.executeMappingPhase(KeyValueSourceMappingPhase.java:49)

然后我更改为使用 Predicates 而不是 PredicateBuilder().getEntryObject() 进行 尝试 #2:

  public static int reportCountforSource(String sourceId) 
  {
      @SuppressWarnings("unchecked")
      Predicate<String, Report> predicate = Predicates.equal("sourceId", sourceId);
      Supplier<String, Report, Object> supplier = Supplier.fromPredicate(predicate);
      Long count = reportMap.aggregate(supplier, Aggregations.count());

      return count.intValue();
  }    

这导致了相同的 ClassCastException

最后,我在尝试 #3:

中使用了 lambda 来实现 Predicate 接口
  public static int reportCountforSource(String sourceId) 
  {
      Predicate<String, Report> predicate = (entry) ->  entry.getValue().getSourceId().equals(sourceId);
      Supplier<String, Report, Object> supplier = Supplier.fromPredicate(predicate);
      Long count = reportMap.aggregate(supplier, Aggregations.count());

      return count.intValue();
  }    

这次尝试终于成功了。

问题 #1:这是 Hazelcast 中的错误吗?聚合框架似乎应该支持由 PredicatesPredicateBuilder 构造的 Predicate?如果不是,则应创建一个新类型(例如 AggregationPredicate)以避免这种混淆。

问题#2(与#1 相关): 使用 lambda Predicate 导致我创建的索引未被使用。相反,地图中的每个条目都被反序列化以确定它是否与 Predicate 匹配,这会大大降低速度。有什么方法可以从将使用索引的 Predicate 创建 Supplier 吗? (编辑: 我通过在 readPortable 方法中放置一个计数器来验证每个条目都被反序列化)。

这看起来像是一个 Hazelcast 错误。我想我从来没有创建单元测试来测试由 PredicateBuilder 创建的 Predicate。你能在 github 提交问题吗?

目前 mapreduce 不支持索引,无论您尝试什么。索引系统将在不久的将来被重写,以支持各种非原始索引,如 partial 或 stuff。

另一个尚不可用的东西是针对可移植对象的优化reader,这将防止完全反序列化。