Java 8 个要按大于最旧元素的平均值过滤的流

Java 8 Streams to filter by average greater than oldest element

我正在尝试过滤一个城市中的一群 person,使他们的平均年龄大于数据库中具有最年长 created_at 时间戳的人的年龄.

我正在做类似下面的事情,

LinkedBlockingDeque<Person> allAges = null;
LinkedBlockingDeque<Person> filteredAges = new LinkedBlockingDeque<Person>();

allAges = ageStorage.getAllAgesByCityOrderByInsertionTime("city A");

allAges.stream()
       .filter(this.getFirstInsertedAgeGreaterThanAverage(allAges))
       .forEach(filteredAges::add);

getFirstInsertedAgeGreaterThanAverage如下,

private static Predicate<Integer> getFirstInsertedAgeGreaterThanAverage(LinkedBlockingDeque<Person> personList){
    return p -> (personList.stream().mapToInt(Person::getAge).average() >
     personList.peekFirst().getAge());
}

我想这里有些地方不太对劲,但不确定是什么...有没有办法在没有 getFirstInsertedAgeGreaterThanAverage 方法的情况下完成此操作

从你的问题中不清楚你想要哪个子集。只包括一个最大年龄的人(如果恰好是第一个,则不包括任何人)是一个有效的答案。所以我假设您想获得最大可能的此类子集。正如@tobias_k 所注意到的,这可以通过按年龄、递减和 select 平均不超过限制的最长前缀对输入进行排序来解决。

不幸的是,这无法在使用标准 Stream API 的单个 Stream 中解决。可能的解决方案可能如下所示:

public static List<Person> maxSubSetWithGreaterAverage(Collection<Person> persons,
        int averageLimit) {
    List<Person> list = new ArrayList<>(persons);
    // Sort people by age, decreasing
    list.sort(Comparator.comparingInt(Person::getAge).reversed());
    // get all the ages
    int[] ages = list.stream().mapToInt(Person::getAge).toArray();
    // transform them to cumulative sums
    Arrays.parallelPrefix(ages, Integer::sum);
    // Find the longest prefix for which the cumulative sum is bigger
    // than average
    int length = IntStream.range(0, ages.length)
            .filter(count -> ages[count] <= averageLimit * (count + 1)).findFirst()
            .orElse(ages.length);
    // return the corresponding subList
    return list.subList(0, length);
}

用法:

List<Person> filtered = maxSubSetWithGreaterAverage(allAges, 
            allAges.peekFirst().getAge());

然而,如果不使用 Stream API 和 parallelPrefix,解决方案看起来更好,运行速度更快,占用的内存更少:

public static List<Person> maxSubSetWithGreaterAverage(Collection<Person> persons,
        int averageLimit) {
    List<Person> list = new ArrayList<>(persons);
    list.sort(Comparator.comparingInt(Person::getAge).reversed());
    int cumulativeAge = 0;
    for(int i=0; i<list.size(); i++) {
        cumulativeAge += list.get(i).getAge();
        if(cumulativeAge <= averageLimit * (i + 1) )
            return list.subList(0, i);
    }
    return list;
}

使用我的 StreamEx 库可以定义自定义中间操作,它将在单个流中执行必要的过滤,尽管这需要高级魔法:

public static <T> UnaryOperator<StreamEx<T>> takeWhileAverageGreater(
        ToIntFunction<? super T> keyExtractor, int averageLimit) {
    return s -> takeWhileAverageGreater(
            s.sorted(Comparator.comparingInt(keyExtractor).reversed()),
            keyExtractor, 0L, 0L, averageLimit);
}

private static <T> StreamEx<T> takeWhileAverageGreater(StreamEx<T> input,
        ToIntFunction<? super T> keyExtractor, long count, long cumulativeSum,
        int averageLimit) {
    return input.headTail((head, tail) -> {
        // head is the first element, tail is the Stream of the rest
        // update current sum
        long newSum = cumulativeSum + keyExtractor.applyAsInt(head);
        // short-circuit via null if the limit is reached
        // otherwise call myself for the tail prepending with head
        return newSum <= averageLimit * (count + 1) ? null :
           takeWhileAverageGreater(tail, keyExtractor, count + 1, newSum, averageLimit)
               .prepend(head);
    });
}

现在新的takeWhileAverageGreater操作可以这样使用:

List<Person> filtered = StreamEx.of(allAges)
        .chain(takeWhileAverageGreater(Person::getAge, allAges.peekFirst().getAge()))
        .toList();

结果是一样的