如何使用Java Stream API解决滑动window问题?
How to use Java Stream API to resolve sliding window problems?
从 Java 8 开始,使用 Stream API 变得很流行。但是,有些问题可以使用基于批处理的算法轻松解决,但使用基于流的解决方案可能无法轻松解决。
例如,给定一张信用卡按时间顺序的交易流,我想找到该卡在每个 24小时内的总交易额,以便我可以比较它用一个阈值来猜测卡是否被盗。
流数据可以简单到
transaction time amount
2019-01-23T10:12:31.484Z 100
2019-01-24T00:12:30.004Z 50
2019-01-24T09:00:00.000Z 23
2019-01-27T05:10:00.300Z 65
这可以看作是一个滑动window问题,需要检查元素的关系。基于批处理的解决方案不是很复杂。我可以使用 queue 只保留 24 小时内发生的交易。
该算法可以用以下步骤大致描述:
创建一个队列并将第一个事务放入队列;
比较下一笔交易和队头的交易
查看2笔交易的交易时间差
如果时差小于24小时,
将事务添加到队列并return到步骤 2。
否则如果超过 24 小时,则
5.1。计算队列中交易的总交易量,因为这些交易发生在 24 小时内
5.2将结果放入结果列表
5.3 轮询队列以删除最旧的事务,直到新事务发生在队列中的事务少于 24 小时。
5.4 循环到第 2 步。
但是,我发现使用Java流API很难实现上述算法,所以我想知道使用Java流来实现滑动window问题?如果是,任何人都可以使用 Java Stream 给出一些提示或一些伪代码来实现它吗?没有必要使用上述算法。任何基于流的算法都可以。
这是如何使用 reduce
完成的示例。
为了收集数据,创建了 class:
class Row {
private LocalDateTime date;
private Integer value;
public Row(LocalDateTime date, Integer value) {
this.date = date;
this.value = value;
}
// getters and setters
首先将示例数据读入流。
Stream<Row> readData = Stream.of(
"2019-01-23T10:12:31 100",
"2019-01-24T00:12:30 50",
"2019-01-24T09:00:00 23",
"2019-01-25T03:00:00 23",
"2019-01-27T05:10:00 65")
.map(s -> s.split("\s+"))
.map(a -> new Row(LocalDateTime.parse(a[0], DateTimeFormatter.ISO_LOCAL_DATE_TIME), Integer.valueOf(a[1])));
比 reduce
方法每 24 小时收集一次行到分隔列表并将其存储在主列表中。
List<List<Row>> all = new ArrayList<>();
all.add(readData
.map(Arrays::asList)
.reduce(new ArrayList<>(), (a, v) -> {
if (a.isEmpty()) {
a.addAll(v);
} else {
LocalDateTime first = a.get(0).getDate().plusHours(24);
if (first.isAfter(v.get(0).getDate())) {
a.addAll(v);
} else {
all.add(a);
LocalDateTime last = v.get(0).getDate().minusHours(24);
a = new ArrayList<>(a.stream()
.filter(r -> last.isBefore(r.getDate()))
.collect(Collectors.toList()));
a.addAll(v);
}
}
return a;
}));
最后您可以打印出所有列表或计算每个时期的交易价值。
all.forEach(System.out::println);
all.stream().map(l -> l.stream()
.map(Row::getValue)
.reduce(Integer::sum)
.get()
)
.forEach(System.out::println);
更新
reduce
也可以用简单的forEach
代替。
那么主要部分就是这样并且不那么复杂:
LinkedList<List<Row>> all = new LinkedList<>(Arrays.asList(new ArrayList<>()));
readData
.forEach(v -> {
if (!all.getLast().isEmpty()) {
// check if in 24h boundary
LocalDateTime upperValue = all.getLast().get(0).getDate().plusHours(24);
if (!upperValue.isAfter(v.getDate())) {
// create copy with row older earlier than 24h
LocalDateTime lowerValue = v.getDate().minusHours(24);
all.add(new ArrayList<>(all.getLast().stream()
.filter(r -> lowerValue.isBefore(r.getDate()))
.collect(Collectors.toList())));
}
}
all.getLast().add(v);
});
通过 Streams 执行此操作的 "problem" 是它们被设计为易于并行化。它们在设计上 不是 仅顺序算法,因此纯顺序算法(如您的基于队列的算法)不适用于 Streams。这是从命令式编程到函数式编程的飞跃,您有时需要一种新算法、新方法。将命令式代码转换为函数式代码并不总是那么简单。
如果你的数据源可以很容易split multiple times (which, I think, is equivalent to being able to provide the required windows efficiently and concurrently), you can do this with Streams. For example, you could (re)use StreamEx's ofSublists()
方法如果你的数据源是List
。这是有效的,因为任何子列表的创建都是高效的并且独立于其他子列表,因此可以安全地并发调用。
或者,您可以假设您的 Stream 永远不会 运行 并行化,并使用专注于纯序列的东西,例如jOOL: Seq.sliding()
。这适用于任何 Seq,因为实现可以使用迭代状态而不用担心并行性。
从 Java 8 开始,使用 Stream API 变得很流行。但是,有些问题可以使用基于批处理的算法轻松解决,但使用基于流的解决方案可能无法轻松解决。
例如,给定一张信用卡按时间顺序的交易流,我想找到该卡在每个 24小时内的总交易额,以便我可以比较它用一个阈值来猜测卡是否被盗。 流数据可以简单到
transaction time amount
2019-01-23T10:12:31.484Z 100
2019-01-24T00:12:30.004Z 50
2019-01-24T09:00:00.000Z 23
2019-01-27T05:10:00.300Z 65
这可以看作是一个滑动window问题,需要检查元素的关系。基于批处理的解决方案不是很复杂。我可以使用 queue 只保留 24 小时内发生的交易。 该算法可以用以下步骤大致描述:
创建一个队列并将第一个事务放入队列;
比较下一笔交易和队头的交易
查看2笔交易的交易时间差
如果时差小于24小时,
将事务添加到队列并return到步骤 2。
否则如果超过 24 小时,则
5.1。计算队列中交易的总交易量,因为这些交易发生在 24 小时内
5.2将结果放入结果列表
5.3 轮询队列以删除最旧的事务,直到新事务发生在队列中的事务少于 24 小时。
5.4 循环到第 2 步。
但是,我发现使用Java流API很难实现上述算法,所以我想知道使用Java流来实现滑动window问题?如果是,任何人都可以使用 Java Stream 给出一些提示或一些伪代码来实现它吗?没有必要使用上述算法。任何基于流的算法都可以。
这是如何使用 reduce
完成的示例。
为了收集数据,创建了 class:
class Row {
private LocalDateTime date;
private Integer value;
public Row(LocalDateTime date, Integer value) {
this.date = date;
this.value = value;
}
// getters and setters
首先将示例数据读入流。
Stream<Row> readData = Stream.of(
"2019-01-23T10:12:31 100",
"2019-01-24T00:12:30 50",
"2019-01-24T09:00:00 23",
"2019-01-25T03:00:00 23",
"2019-01-27T05:10:00 65")
.map(s -> s.split("\s+"))
.map(a -> new Row(LocalDateTime.parse(a[0], DateTimeFormatter.ISO_LOCAL_DATE_TIME), Integer.valueOf(a[1])));
比 reduce
方法每 24 小时收集一次行到分隔列表并将其存储在主列表中。
List<List<Row>> all = new ArrayList<>();
all.add(readData
.map(Arrays::asList)
.reduce(new ArrayList<>(), (a, v) -> {
if (a.isEmpty()) {
a.addAll(v);
} else {
LocalDateTime first = a.get(0).getDate().plusHours(24);
if (first.isAfter(v.get(0).getDate())) {
a.addAll(v);
} else {
all.add(a);
LocalDateTime last = v.get(0).getDate().minusHours(24);
a = new ArrayList<>(a.stream()
.filter(r -> last.isBefore(r.getDate()))
.collect(Collectors.toList()));
a.addAll(v);
}
}
return a;
}));
最后您可以打印出所有列表或计算每个时期的交易价值。
all.forEach(System.out::println);
all.stream().map(l -> l.stream()
.map(Row::getValue)
.reduce(Integer::sum)
.get()
)
.forEach(System.out::println);
更新
reduce
也可以用简单的forEach
代替。
那么主要部分就是这样并且不那么复杂:
LinkedList<List<Row>> all = new LinkedList<>(Arrays.asList(new ArrayList<>()));
readData
.forEach(v -> {
if (!all.getLast().isEmpty()) {
// check if in 24h boundary
LocalDateTime upperValue = all.getLast().get(0).getDate().plusHours(24);
if (!upperValue.isAfter(v.getDate())) {
// create copy with row older earlier than 24h
LocalDateTime lowerValue = v.getDate().minusHours(24);
all.add(new ArrayList<>(all.getLast().stream()
.filter(r -> lowerValue.isBefore(r.getDate()))
.collect(Collectors.toList())));
}
}
all.getLast().add(v);
});
通过 Streams 执行此操作的 "problem" 是它们被设计为易于并行化。它们在设计上 不是 仅顺序算法,因此纯顺序算法(如您的基于队列的算法)不适用于 Streams。这是从命令式编程到函数式编程的飞跃,您有时需要一种新算法、新方法。将命令式代码转换为函数式代码并不总是那么简单。
如果你的数据源可以很容易split multiple times (which, I think, is equivalent to being able to provide the required windows efficiently and concurrently), you can do this with Streams. For example, you could (re)use StreamEx's ofSublists()
方法如果你的数据源是List
。这是有效的,因为任何子列表的创建都是高效的并且独立于其他子列表,因此可以安全地并发调用。
或者,您可以假设您的 Stream 永远不会 运行 并行化,并使用专注于纯序列的东西,例如jOOL: Seq.sliding()
。这适用于任何 Seq,因为实现可以使用迭代状态而不用担心并行性。