RxJava 1.x: 如何在单元测试中模拟背压
RxJava 1.x: How to simulate backpressure in unit test
给定代码:
logItemPublisher
.buffer(
loggingProperties.getBufferTimeoutMillis(),
TimeUnit.MILLISECONDS,
loggingProperties.getBufferSize(),
logDispatchScheduler
)
.onBackpressureDrop(droppedLogsHandler)
// persist uses Spring RestOperations
.flatMap(logs -> persist(logs, url)
.subscribeOn(logDispatchScheduler)
)
.subscribe();
其中,
logItemPublisher = ReplaySubject.<AbstractHttpLogItem>createWithSize(4 * loggingProperties.getBufferSize())
.toSerialized();
logDispatchScheduler = new TestScheduler()
我有一个单元测试:
@Test
public void testLogServiceSlow() {
loggingProperties.setBufferSize(1);
// rx.ring-buffer.size property stores the size of any in-memory ring buffers that RxJava uses when an
// Observable cannot keep up with rate of event emissions.
// default value is 128 on the JVM; RxJava 2.x makes this configurable in flatMap
System.setProperty("rx.ring-buffer.size", "2");
// this is what persist does
when(restOperations.postForEntity(anyString(), any(HttpEntity.class), eq(Void.class)))
.thenAnswer(invocation -> {
Thread.sleep(500);
return ResponseEntity.ok().build();
});
logServiceClient.persistLogs(logs);
scheduler.advanceTimeBy(2L, TimeUnit.SECONDS);
System.clearProperty("rx.ring-buffer.size");
Mockito.verify(restOperations, times(2))
.postForEntity(Mockito.eq("http://log:9000/log/service"), logsCaptors.capture(), eq(Void.class));
}
测试失败,因为尽管有 Thread.sleep
,但没有产生背压。我不明白为什么不;内部环形缓冲区应在 2 个项目后填满,其余项目应被丢弃。
回答我自己的问题:
int ringBufferSize = Optional.ofNullable(System.getProperty("rx.ring-buffer.size"))
.map(Integer::parseInt)
.orElse(128);
logItemPublisher
.buffer(
loggingProperties.getBufferTimeoutMillis(),
TimeUnit.MILLISECONDS,
loggingProperties.getBufferSize(),
logDispatchScheduler
)
.onBackpressureDrop(droppedLogsHandler)
.observeOn(logDispatchScheduler, ringBufferSize)
.flatMap(logs -> persist(logs, url))
.subscribe();
然后在测试中,只需设置ringBufferSize = 2
,不需要设置系统属性 rx.ring-buffer.size
.
给定代码:
logItemPublisher
.buffer(
loggingProperties.getBufferTimeoutMillis(),
TimeUnit.MILLISECONDS,
loggingProperties.getBufferSize(),
logDispatchScheduler
)
.onBackpressureDrop(droppedLogsHandler)
// persist uses Spring RestOperations
.flatMap(logs -> persist(logs, url)
.subscribeOn(logDispatchScheduler)
)
.subscribe();
其中,
logItemPublisher = ReplaySubject.<AbstractHttpLogItem>createWithSize(4 * loggingProperties.getBufferSize())
.toSerialized();
logDispatchScheduler = new TestScheduler()
我有一个单元测试:
@Test
public void testLogServiceSlow() {
loggingProperties.setBufferSize(1);
// rx.ring-buffer.size property stores the size of any in-memory ring buffers that RxJava uses when an
// Observable cannot keep up with rate of event emissions.
// default value is 128 on the JVM; RxJava 2.x makes this configurable in flatMap
System.setProperty("rx.ring-buffer.size", "2");
// this is what persist does
when(restOperations.postForEntity(anyString(), any(HttpEntity.class), eq(Void.class)))
.thenAnswer(invocation -> {
Thread.sleep(500);
return ResponseEntity.ok().build();
});
logServiceClient.persistLogs(logs);
scheduler.advanceTimeBy(2L, TimeUnit.SECONDS);
System.clearProperty("rx.ring-buffer.size");
Mockito.verify(restOperations, times(2))
.postForEntity(Mockito.eq("http://log:9000/log/service"), logsCaptors.capture(), eq(Void.class));
}
测试失败,因为尽管有 Thread.sleep
,但没有产生背压。我不明白为什么不;内部环形缓冲区应在 2 个项目后填满,其余项目应被丢弃。
回答我自己的问题:
int ringBufferSize = Optional.ofNullable(System.getProperty("rx.ring-buffer.size"))
.map(Integer::parseInt)
.orElse(128);
logItemPublisher
.buffer(
loggingProperties.getBufferTimeoutMillis(),
TimeUnit.MILLISECONDS,
loggingProperties.getBufferSize(),
logDispatchScheduler
)
.onBackpressureDrop(droppedLogsHandler)
.observeOn(logDispatchScheduler, ringBufferSize)
.flatMap(logs -> persist(logs, url))
.subscribe();
然后在测试中,只需设置ringBufferSize = 2
,不需要设置系统属性 rx.ring-buffer.size
.