Reactor EmitterProcessor 只保留最后 n 个元素?
Reactor EmitterProcessor that only retains last n elements?
如何创建只保留 最新 n 个元素的 EmitterProcessor
,这样即使没有订阅者它也能正常工作?
目前我创建了这样的处理器:
EmitterProcessor<Integer> processor = EmitterProcessor.create();
并且外部系统全天随机提供温度更新。在那个系统的回调中我做了:
void tempConsumer(int temp) {
processor.onNext(temp);
}
但是,一旦添加 processor.getBufferSize()
个元素,onNext(...)
就会阻塞。
如何创建一个处理器来丢弃最旧的元素,在这种情况下,而不是阻塞?
reactor-core #763. Simon Baslé first discusses a proposed change 到 EmitterProcessor
似乎在某种程度上涵盖了这一点,因此当 "sending data while there are NO subscribers [and] the queue contains bufferSize
elements, the oldest element is dropped and the onNext
is enqueued." 但在下一条评论中,他说 "we won't go ahead with my suggested change above. We instead advise you to use the sink()
rather than directly the onNext
. Namely, to use the onRequest
callback inside the sink()
to perform exactly as many sink.next(...)
as there are requests."
但是,如果我理解正确的话,这只涵盖了您可以按需计算新元素的情况,例如像这样:
FluxSink<Integer> sink = processor.sink();
Random random = new Random();
sink.onRequest(n -> random.nextInt()); // Generate next n requested elements.
但在我的情况下,我无法按需生成 最新的 n 个温度读数。当然,我可以维护我自己的最新读数的外部有界缓冲区,然后从 onRequest(...)
中读取,但我假设 Reactor 可以为我做到这一点?
我认为这个问题是一个骗局 - 但我的 Google foo 在这里让我失望了。
Ricard Kollcaku 的 that one should use ReplayProcessor
似乎是正确的做事方式。这是我写的另一个例子,目的是让我清楚地了解如何使用它:
ReplayProcessor<Integer> flux = ReplayProcessor.create(Queues.SMALL_BUFFER_SIZE);
FluxSink<Integer> sink = flux.sink();
// ReplayProcessor.getBufferSize() returns unbounded,
// while CAPACITY returns the capacity of the underlying buffer.
int capacity = flux.scan(Scannable.Attr.CAPACITY);
// Add twice as many elements as the underlying buffer can take.
int count = capacity * 2;
for (int i = 0; i < count; i++) {
sink.next(i);
}
// If `capacity` is 256, this will print value 256 thru to 511.
flux.subscribe(System.out::println);
我还发现 this section, in Hands-On Reactive Programming with Reactor,在解释事情时很有用。
你必须像这个例子一样使用 ReplayProcessor
:
ReplayProcessor<Integer> directProcessor = ReplayProcessor.cacheLast();
Flux.range(1, 10)
.map(integer -> {
directProcessor.onNext(integer);
return integer;
}).doOnComplete(() -> {
directProcessor.subscribe(System.out::println);
directProcessor.subscribe(System.out::println);
})
.subscribe();
如何创建只保留 最新 n 个元素的 EmitterProcessor
,这样即使没有订阅者它也能正常工作?
目前我创建了这样的处理器:
EmitterProcessor<Integer> processor = EmitterProcessor.create();
并且外部系统全天随机提供温度更新。在那个系统的回调中我做了:
void tempConsumer(int temp) {
processor.onNext(temp);
}
但是,一旦添加 processor.getBufferSize()
个元素,onNext(...)
就会阻塞。
如何创建一个处理器来丢弃最旧的元素,在这种情况下,而不是阻塞?
reactor-core #763. Simon Baslé first discusses a proposed change 到 EmitterProcessor
似乎在某种程度上涵盖了这一点,因此当 "sending data while there are NO subscribers [and] the queue contains bufferSize
elements, the oldest element is dropped and the onNext
is enqueued." 但在下一条评论中,他说 "we won't go ahead with my suggested change above. We instead advise you to use the sink()
rather than directly the onNext
. Namely, to use the onRequest
callback inside the sink()
to perform exactly as many sink.next(...)
as there are requests."
但是,如果我理解正确的话,这只涵盖了您可以按需计算新元素的情况,例如像这样:
FluxSink<Integer> sink = processor.sink();
Random random = new Random();
sink.onRequest(n -> random.nextInt()); // Generate next n requested elements.
但在我的情况下,我无法按需生成 最新的 n 个温度读数。当然,我可以维护我自己的最新读数的外部有界缓冲区,然后从 onRequest(...)
中读取,但我假设 Reactor 可以为我做到这一点?
我认为这个问题是一个骗局 - 但我的 Google foo 在这里让我失望了。
Ricard Kollcaku 的 ReplayProcessor
似乎是正确的做事方式。这是我写的另一个例子,目的是让我清楚地了解如何使用它:
ReplayProcessor<Integer> flux = ReplayProcessor.create(Queues.SMALL_BUFFER_SIZE);
FluxSink<Integer> sink = flux.sink();
// ReplayProcessor.getBufferSize() returns unbounded,
// while CAPACITY returns the capacity of the underlying buffer.
int capacity = flux.scan(Scannable.Attr.CAPACITY);
// Add twice as many elements as the underlying buffer can take.
int count = capacity * 2;
for (int i = 0; i < count; i++) {
sink.next(i);
}
// If `capacity` is 256, this will print value 256 thru to 511.
flux.subscribe(System.out::println);
我还发现 this section, in Hands-On Reactive Programming with Reactor,在解释事情时很有用。
你必须像这个例子一样使用 ReplayProcessor
:
ReplayProcessor<Integer> directProcessor = ReplayProcessor.cacheLast();
Flux.range(1, 10)
.map(integer -> {
directProcessor.onNext(integer);
return integer;
}).doOnComplete(() -> {
directProcessor.subscribe(System.out::println);
directProcessor.subscribe(System.out::println);
})
.subscribe();