如何根据发出的事件有条件地缓冲分组 Observable/Flux?
How do I conditionally buffer a Grouped Observable/Flux based on Emitted Events?
我正在尝试根据以下信息编写反应流:
我们有一个实体事件流,其中每个事件都包含其实体的 ID 和 INTENT 或 COMMIT 类型。假定具有给定 ID 的 COMMIT 之前总是有一个或多个具有相同 ID 的 INTENT。收到 INTENT 时,应按其 ID 对其进行分组,并应打开该组的 "buffer"。当接收到同一组的 COMMIT 或配置的超时已过时,缓冲区应为 "closed"。应发出生成的缓冲区。
请注意,在收到关闭 COMMIT 之前可能会收到多个 INTENT。 (编辑:)bufferDuration
应该保证任何 "opened" 缓冲区在收到打开缓冲区的 INTENT 后 bufferDuration
时间后发出,有或没有 COMMIT。
我最近的尝试如下:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(groupedFlux -> createGroupBuffer(groupedFlux))
.flatMap(Function.identity());
}
protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
}
protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
}
protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.INTENT;
}
protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.COMMIT;
}
}
这是我试图通过的测试:
@Test
public void entityEventsCanBeBuffered() throws Exception {
FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();
Duration bufferDuration = Duration.ofMillis(250);
Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
bufferFactory.setBufferDuration(bufferDuration);
List<List<EntityEvent>> buffers = new ArrayList<>();
bufferFlux.subscribe(buffers::add);
EntityEvent intent = new EntityEvent();
intent.setId("SOME_ID");
intent.setEventType(EventType.INTENT);
EntityEvent commit = new EntityEvent();
commit.setId(intent.getId());
commit.setEventType(EventType.COMMIT);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
Thread.sleep(500);
assertEquals(2, buffers.size());
assertFalse(buffers.get(0).isEmpty());
assertFalse(buffers.get(1).isEmpty());
}
通过这个测试,我得到了两个发出的缓冲区,但它们都是空的。您会注意到,在深入研究之后,我不得不在某些点添加 .publish()
,以免从 Reactor 获得异常 This processor allows only a single Subscriber
。这个问题的答案 是我采用这种方法的原因。
我目前正在使用 Reactor,但我认为这与使用 Observable 和同名方法的 RxJava 一对一转换。
有什么想法吗?
我认为这是 Rx 的最终用例 groupBy
。来自文档:
Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.
在您的情况下,此标准是 ID,并且在每个向您发出的 GroupedPublisher 上 takeUntil
类型是 COMMIT:
source
.groupBy(EntityEvent::getId)
.flatMap(group ->
group
.takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
.takeUntil(this::shouldCloseBufferOnEvent)
.toList())
编辑:添加时间条件。
感谢 Tassos Bassoukos 的意见。以下 Reactor 代码适用于我:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
@Override
public Flux<List<EntityEvent>> create(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(this::createGroupBuffer)
.flatMap(Function.identity());
}
protected Mono<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.take(bufferDuration)
.takeUntil(this::shouldCloseBufferOnEvent)
.collectList();
}
protected boolean shouldCloseBufferOnEvent(EntityEvent EntityEvent) {
return EntityEvent.getEventType() == EventType.COMMIT;
}
}
我正在尝试根据以下信息编写反应流:
我们有一个实体事件流,其中每个事件都包含其实体的 ID 和 INTENT 或 COMMIT 类型。假定具有给定 ID 的 COMMIT 之前总是有一个或多个具有相同 ID 的 INTENT。收到 INTENT 时,应按其 ID 对其进行分组,并应打开该组的 "buffer"。当接收到同一组的 COMMIT 或配置的超时已过时,缓冲区应为 "closed"。应发出生成的缓冲区。
请注意,在收到关闭 COMMIT 之前可能会收到多个 INTENT。 (编辑:)bufferDuration
应该保证任何 "opened" 缓冲区在收到打开缓冲区的 INTENT 后 bufferDuration
时间后发出,有或没有 COMMIT。
我最近的尝试如下:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(groupedFlux -> createGroupBuffer(groupedFlux))
.flatMap(Function.identity());
}
protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
}
protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
}
protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.INTENT;
}
protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.COMMIT;
}
}
这是我试图通过的测试:
@Test
public void entityEventsCanBeBuffered() throws Exception {
FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();
Duration bufferDuration = Duration.ofMillis(250);
Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
bufferFactory.setBufferDuration(bufferDuration);
List<List<EntityEvent>> buffers = new ArrayList<>();
bufferFlux.subscribe(buffers::add);
EntityEvent intent = new EntityEvent();
intent.setId("SOME_ID");
intent.setEventType(EventType.INTENT);
EntityEvent commit = new EntityEvent();
commit.setId(intent.getId());
commit.setEventType(EventType.COMMIT);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
Thread.sleep(500);
assertEquals(2, buffers.size());
assertFalse(buffers.get(0).isEmpty());
assertFalse(buffers.get(1).isEmpty());
}
通过这个测试,我得到了两个发出的缓冲区,但它们都是空的。您会注意到,在深入研究之后,我不得不在某些点添加 .publish()
,以免从 Reactor 获得异常 This processor allows only a single Subscriber
。这个问题的答案
我目前正在使用 Reactor,但我认为这与使用 Observable 和同名方法的 RxJava 一对一转换。
有什么想法吗?
我认为这是 Rx 的最终用例 groupBy
。来自文档:
Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.
在您的情况下,此标准是 ID,并且在每个向您发出的 GroupedPublisher 上 takeUntil
类型是 COMMIT:
source
.groupBy(EntityEvent::getId)
.flatMap(group ->
group
.takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
.takeUntil(this::shouldCloseBufferOnEvent)
.toList())
编辑:添加时间条件。
感谢 Tassos Bassoukos 的意见。以下 Reactor 代码适用于我:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
@Override
public Flux<List<EntityEvent>> create(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(this::createGroupBuffer)
.flatMap(Function.identity());
}
protected Mono<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.take(bufferDuration)
.takeUntil(this::shouldCloseBufferOnEvent)
.collectList();
}
protected boolean shouldCloseBufferOnEvent(EntityEvent EntityEvent) {
return EntityEvent.getEventType() == EventType.COMMIT;
}
}