Hazelcast Jet 对比 Java 8 个流

Hazelcast Jet vs Java 8 streams

我正在尝试使用 Hazelcast Jet 根据最大​​日期对 List<Map<String,Object>> 对象进行排序。

这是我的 java 8 有效代码:

public static List<Map<String, Object>> extractDate1(List<Map<String, Object>> data) {
    return data.stream().map(value -> new Object() {
        Map<String, Object> theMap = value;
        LocalDate date = extractDate(value);
    }).sorted(Comparator.comparing(obj -> obj.date)).map(obj -> obj.theMap).collect(Collectors.toList());
}

public static LocalDate extractDate(Map<String, Object> value) {
    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("dd-MM-yyyy");
    DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d-MM-yyyy");
    return LocalDate.parse(LocalDate.parse(value.get("effectiveDate").toString(), formatter2).format(formatter1),
            formatter);
}

以上java8段代码将地图对象从低到高排序:

下面是我试图提取的 Jet 代码,它也提供了正确的输出。但我只想利用 hazelcast jet aggregate/rolling 功能

// fetching jsonb type data from db
BatchStage<Object> jobJson = dbValue
        // this model holds the string json value
        // converting json data to Map object
        .map(model -> JsonUtil.mapFrom(model.getJosnValue())
    .filter(map -> map.size() != 0)
    .map(map -> {
            // each json/map object will be having an array and again an array will I have multiple json objects in the
            // I'm filtering json objects based on max date 
      List<Map<String, Object>> extractedDateValue;
            if (map.containsKey("records")) {
         //Here I'm calling external function (above java 8 code)
                 extractedDateValue = extractMapBasedOnMax(
                        (List<Map<String, Object>>) map.get("records"));
            }
                
            return extractedDateValue.get(extractedDateValue.size() - 1);
        });

JSON数据示例:

{
    "id": "01",
    "records": [{
        "location": "xyz1",
        "effectiveDate": "02-03-2021"
    }, {
        "location": "xyz2",
        "effectiveDate": "02-04-2021"
    }]
}

预期输出:

{
  "location": "xyz2",
   "effectiveDate": "02-04-2021"
}

是否可以通过 Hazelcast Jet 滚动聚合来实现?或者任何建议都会有所帮助..谢谢

考虑 flatMapping the pipeline and finding the maximum using topN. flatMap would convert each JSON structure to series of [id, location, effectiveDate] records. See the documentation of flatMap 作为代码示例。

不清楚你是想查找整个集合中的最大元素还是每个id的最大元素。添加 groupingKey 会找到每个 ID 的最大值。

“元代码”中的管道形状:

source // stream of JSON structures
.flatMap // stream [id, location, effectiveDate]
.groupingKey // for maximum per id, remove for global max
.aggregate(AggregateOpperations.topN) // finds max 
.sink;