Flux 根据日期时间发出元素 属性

Flux emit elements based on date time property

我有一个场景,我想根据事件中的日期时间 属性 处理事件。

到目前为止我失败的尝试:

class Data {
    private Integer id;
    private LocalDateTime sendAt;
}

List<Data> data = Arrays.asList(
        new Data(1, LocalDateTime.now().plusSeconds(1)),
        new Data(2, LocalDateTime.now().plusSeconds(2)),
        new Data(3, LocalDateTime.now().plusSeconds(3)),
        new Data(4, LocalDateTime.now().plusSeconds(5)),
        new Data(5, LocalDateTime.now().plusSeconds(8)),
        new Data(6, LocalDateTime.now().plusSeconds(13)),
        new Data(7, LocalDateTime.now().plusSeconds(21)),
        new Data(8, LocalDateTime.now().plusSeconds(34)),
        new Data(9, LocalDateTime.now().plusSeconds(55)));

Flux<Data> dataFlux = Flux.fromIterable(data);

dataFlux.takeWhile(d -> d.sendAt.isAfter( LocalDateTime.now() ))
        .subscribe(x -> System.out.println(x));

我希望每个事件在 1, 2, 3, 5, 8, 13, 21, ... 秒后打印出来。

Spring WebFlux / Reactor 可以吗?

可以通过将 delayUntilMono.delay(<custom_time>) 和 HTTP 流与 MediaType.APPLICATION_STREAM_JSON_VALUE 或 SSE 的 MediaType.TEXT_EVENT_STREAM_VALUE:

相结合
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

@RestController
public class FluxDateTime {
    @GetMapping(value = "/time", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Data> getData() {
        List<Data> data = Arrays.asList(
                new Data(1, 1),
                new Data(2, 2),
                new Data(3, 3),
                new Data(4, 5),
                new Data(5, 8),
                new Data(6, 13),
                new Data(7, 21),
                new Data(8, 34),
                new Data(9, 55));

        return Flux.fromIterable(data)
                .delayUntil(d -> Mono.delay(Duration.ofSeconds(d.getDelay())));
    }

    /* Keep in mind that empty constructor, getters/setters
       are purely only for JSON serialization */
    private final class Data {
        private int id;
        private int delay;

        public Data() {}

        public Data(int id, int delay) {
            this.id = id;
            this.delay = delay;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public int getDelay() {
            return delay;
        }

        public void setDelay(int delay) {
            this.delay = delay;
        }
    }
}

Flux 收集延迟每个元素,直到基础 Mono 流终止。根据输入流数据,每个 Monos 都会延迟相应的时间(以秒为单位)。

实际上我们已经获得了具有您想要的自定义延迟的流式 HTTP 响应:

$ curl http://localhost:8080/time
{"id":1,"delay":1} # after 1 sec
{"id":2,"delay":2} # after 2 sec
{"id":3,"delay":3} # and so on
{"id":4,"delay":5}
{"id":5,"delay":8}
{"id":6,"delay":13}
{"id":7,"delay":21}
{"id":8,"delay":34}
{"id":9,"delay":55}