按时间对通量的元素进行分组
Grouping the Elements of a Flux by Time
假设我有一个实时股票报价流服务,它有一个 returns 和 Flux<Quote>
的端点。这是实时流数据,内容类型为text/event-stream
.
我想创建一个订阅这个数据的服务,对Quote
组进行聚合操作,比如一段时间内的总成交量、最高价和最低价。这些组将按时间分组。我还希望这是一个实时流,returns 随着每个时间段的过去,每单位时间收到的报价摘要。
我的问题是我该怎么做?我试过使用 Flux::groupBy
和 Flux::collectList
或类似的方法,但这会等待流完成,然后再为每个时间块发出聚合结果。在连续的报价流中,这将导致我的服务永远不会发出任何东西。
我可以通过从流媒体服务订阅 Flux<Quote>
流,然后发布到接收器来解决这个问题。像这样:
public class QuoteAggregatorService {
private Sinks.Many<AggregateQuote> aggregateSink = Sinks.many()
.multicast()
.onBackpressureBuffer();
private AggregateQuote aggregateQuote = new AggregateQuote();
private long groupTime = 0L;
private ProducerClient producerClient; // Spring reactive WebClient client to the streaming quote service
public Flux<AggregateQuote> getAggregateQuotes() {
producerClient.getQuotes(symbol)
.doOnNext(q -> {
if (getGroupTime(q) > groupTime) {
emitQuote();
aggregateQuote = new AggregateQuote();
groupTime = getGroupTime(q); // rounds to nearest minute
}
aggregateQuote.processQuote(q); // updates the aggregate values including volume, open, high, low, etc.
})
.doOnComplete(() -> {
emitQuote(); // have to emit the last aggregate quote on complete, otherwise it gets skipped
aggregateSink.tryEmitComplete(); // signals to the caller that the Flux has completed.
})
.subscribe();
return aggregateSink.asFlux();
}
private void emitQuote() {
aggregateSink.tryEmitNext(aggregateQuote);
}
}
假设我有一个实时股票报价流服务,它有一个 returns 和 Flux<Quote>
的端点。这是实时流数据,内容类型为text/event-stream
.
我想创建一个订阅这个数据的服务,对Quote
组进行聚合操作,比如一段时间内的总成交量、最高价和最低价。这些组将按时间分组。我还希望这是一个实时流,returns 随着每个时间段的过去,每单位时间收到的报价摘要。
我的问题是我该怎么做?我试过使用 Flux::groupBy
和 Flux::collectList
或类似的方法,但这会等待流完成,然后再为每个时间块发出聚合结果。在连续的报价流中,这将导致我的服务永远不会发出任何东西。
我可以通过从流媒体服务订阅 Flux<Quote>
流,然后发布到接收器来解决这个问题。像这样:
public class QuoteAggregatorService {
private Sinks.Many<AggregateQuote> aggregateSink = Sinks.many()
.multicast()
.onBackpressureBuffer();
private AggregateQuote aggregateQuote = new AggregateQuote();
private long groupTime = 0L;
private ProducerClient producerClient; // Spring reactive WebClient client to the streaming quote service
public Flux<AggregateQuote> getAggregateQuotes() {
producerClient.getQuotes(symbol)
.doOnNext(q -> {
if (getGroupTime(q) > groupTime) {
emitQuote();
aggregateQuote = new AggregateQuote();
groupTime = getGroupTime(q); // rounds to nearest minute
}
aggregateQuote.processQuote(q); // updates the aggregate values including volume, open, high, low, etc.
})
.doOnComplete(() -> {
emitQuote(); // have to emit the last aggregate quote on complete, otherwise it gets skipped
aggregateSink.tryEmitComplete(); // signals to the caller that the Flux has completed.
})
.subscribe();
return aggregateSink.asFlux();
}
private void emitQuote() {
aggregateSink.tryEmitNext(aggregateQuote);
}
}