如何使用Java Stream API解决滑动window问题?

How to use Java Stream API to resolve sliding window problems?

从 Java 8 开始,使用 Stream API 变得很流行。但是,有些问题可以使用基于批处理的算法轻松解决,但使用基于流的解决方案可能无法轻松解决。

例如,给定一张信用卡按时间顺序的交易流,我想找到该卡在每个 24小时内的总交易额,以便我可以比较它用一个阈值来猜测卡是否被盗。 流数据可以简单到

transaction time            amount
2019-01-23T10:12:31.484Z    100
2019-01-24T00:12:30.004Z    50
2019-01-24T09:00:00.000Z    23
2019-01-27T05:10:00.300Z    65

这可以看作是一个滑动window问题,需要检查元素的关系。基于批处理的解决方案不是很复杂。我可以使用 queue 只保留 24 小时内发生的交易。 该算法可以用以下步骤大致描述:

  1. 创建一个队列并将第一个事务放入队列;

  2. 比较下一笔交易和队头的交易

  3. 查看2笔交易的交易时间差

    如果时差小于24小时,

  4. 将事务添加到队列并return到步骤 2。

    否则如果超过 24 小时,则

    5.1。计算队列中交易的总交易量,因为这些交易发生在 24 小时内

    5.2将结果放入结果列表

    5.3 轮询队列以删除最旧的事务,直到新事务发生在队列中的事务少于 24 小时。

    5.4 循环到第 2 步。

但是,我发现使用Java流API很难实现上述算法,所以我想知道使用Java流来实现滑动window问题?如果是,任何人都可以使用 Java Stream 给出一些提示或一些伪代码来实现它吗?没有必要使用上述算法。任何基于流的算法都可以。

这是如何使用 reduce 完成的示例。 为了收集数据,创建了 class:

class Row {

    private LocalDateTime date;
    private Integer value;

    public Row(LocalDateTime date, Integer value) {
        this.date = date;
        this.value = value;
    }

    // getters and setters

首先将示例数据读入流。

Stream<Row> readData = Stream.of(
        "2019-01-23T10:12:31    100",
        "2019-01-24T00:12:30    50",
        "2019-01-24T09:00:00    23",
        "2019-01-25T03:00:00    23",
        "2019-01-27T05:10:00    65")

        .map(s -> s.split("\s+"))
        .map(a -> new Row(LocalDateTime.parse(a[0], DateTimeFormatter.ISO_LOCAL_DATE_TIME), Integer.valueOf(a[1])));

reduce 方法每 24 小时收集一次行到分隔列表并将其存储在主列表中。

List<List<Row>> all = new ArrayList<>();
all.add(readData
        .map(Arrays::asList)
        .reduce(new ArrayList<>(), (a, v) -> {
            if (a.isEmpty()) {
                a.addAll(v);
            } else {
                LocalDateTime first = a.get(0).getDate().plusHours(24);
                if (first.isAfter(v.get(0).getDate())) {
                    a.addAll(v);
                } else {
                    all.add(a);

                    LocalDateTime last = v.get(0).getDate().minusHours(24);
                    a = new ArrayList<>(a.stream()
                            .filter(r -> last.isBefore(r.getDate()))
                            .collect(Collectors.toList()));
                    a.addAll(v);

                }
            }
            return a;
        }));

最后您可以打印出所有列表或计算每个时期的交易价值。

all.forEach(System.out::println);

all.stream().map(l -> l.stream()
        .map(Row::getValue)
        .reduce(Integer::sum)
        .get()
)
.forEach(System.out::println);

更新

reduce也可以用简单的forEach代替。 那么主要部分就是这样并且不那么复杂:

LinkedList<List<Row>> all = new LinkedList<>(Arrays.asList(new ArrayList<>()));
readData
        .forEach(v -> {
            if (!all.getLast().isEmpty()) {
                // check if in 24h boundary
                LocalDateTime upperValue = all.getLast().get(0).getDate().plusHours(24);
                if (!upperValue.isAfter(v.getDate())) {
                    // create copy with row older earlier than 24h
                    LocalDateTime lowerValue = v.getDate().minusHours(24);
                    all.add(new ArrayList<>(all.getLast().stream()
                            .filter(r -> lowerValue.isBefore(r.getDate()))
                            .collect(Collectors.toList())));
                }
            }
            all.getLast().add(v);
        });

通过 Streams 执行此操作的 "problem" 是它们被设计为易于并行化。它们在设计上 不是 仅顺序算法,因此纯顺序算法(如您的基于队列的算法)不适用于 Streams。这是从命令式编程到函数式编程的飞跃,您有时需要一种新算法、新方法。将命令式代码转换为函数式代码并不总是那么简单。

如果你的数据源可以很容易split multiple times (which, I think, is equivalent to being able to provide the required windows efficiently and concurrently), you can do this with Streams. For example, you could (re)use StreamEx's ofSublists()方法如果你的数据源是List。这是有效的,因为任何子列表的创建都是高效的并且独立于其他子列表,因此可以安全地并发调用。

或者,您可以假设您的 Stream 永远不会 运行 并行化,并使用专注于纯序列的东西,例如jOOL: Seq.sliding()。这适用于任何 Seq,因为实现可以使用迭代状态而不用担心并行性。