如何按 LocalDate 对通量元素进行分组?

How to group flux elements by LocalDate?

给定一个 Flux<Event>,其中事件是:

data class Event(val eventDate: LocalDate, val data: String)

考虑到有些元素会在完全相同的 LocalDate 发生,我想按事件日期升序对 Flux 元素进行分组。

目标是最终得到类似 Flux<GroupedEvent> 的东西,其中 GroupedEvent 是:

data class GroupedEvent(val eventDate: LocalDate, val data: List<String>)

通知data: List<String> 将包含同时发生的所有数据LocalDate

如何做到这一点?

对于给定的数据结构:

public class GroupedEvent {
    private LocalDate localDate;
    private List<String> data;
}
public class Event {
    private LocalDate eventDate;
    private String data;
}

你可以这样试试:

        List<Event> data = Arrays.asList(
                new Event(LocalDate.of(2019, 1, 1), "A"),
                new Event(LocalDate.of(2019, 1, 2), "B"),
                new Event(LocalDate.of(2019, 1, 3), "C"),
                new Event(LocalDate.of(2019, 1, 1), "D"),
                new Event(LocalDate.of(2019, 2, 1), "E")
        );

        Flux<Event> eventFlux = Flux.fromIterable(data);
        Flux<GroupedFlux<LocalDate, Event>> groupedFluxFlux = eventFlux.groupBy(Event::getEventDate);
        groupedFluxFlux.flatMap(groupedFlux ->
                groupedFlux
                        .map(Event::getData)
                        .collectList()
                        .map(list -> new GroupedEvent(groupedFlux.key(), list))
        )
                .sort(Comparator.comparing(GroupedEvent::getLocalDate))
                .doOnNext(groupedEvent -> System.out.println(groupedEvent.getLocalDate() + " -> " + groupedEvent.getData()))
                .subscribe();