反应式并行化不起作用
Reactive parallelization doesn't work
使用 Reactor 3.0 项目。4.RELEASE。从概念上讲,在 RxJava 中也应该相同。
public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
return pods(apps)
.filter(this::isRunningAndNotThisApp)
.groupBy(Item::getName)
.flatMap(g -> g
.distinct(Item::getIp)
.collectList()
// TODO: This doesn't seem to be working as expected
.subscribeOn(Schedulers.newParallel("par-grp"))
.flatMap(client::refreshPods))
.flatMap(m -> Flux.fromIterable(m.entrySet()))
.collectMap(Map.Entry::getKey, Map.Entry::getValue);
}
想法是 运行 client.refreshPods
在每个组的单独线程中。
编辑:我在发布此问题之前和此处给出的答案之后尝试了 publishOn
,但输出没有改变。
客户:
public class MyServiceClientImpl implements MyServiceClient {
private final RestOperations restOperations;
private final ConfigRefreshProperties configRefreshProperties;
public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
return Flux.fromIterable(pods)
.zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())),
(x, delay) -> x)
.flatMap(this::refreshWithRetry)
.collectMap(Tuple2::getT1, Tuple2::getT2);
}
private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
return Mono.<Boolean>create(emitter -> {
try {
log.info("Attempting to refresh pod: {}.", pod);
ResponseEntity<String> tryRefresh = refresh(pod);
if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
log.error("Failed to refresh pod: {}.", pod);
emitter.success();
} else {
log.info("Successfully refreshed pod: {}.", pod);
emitter.success(true);
}
} catch (Exception e) {
emitter.error(e);
}
})
.map(b -> Tuples.of(pod.getIp(), b))
.log(getClass().getName(), Level.FINE)
.retryWhen(errors -> {
int maxRetries = configRefreshProperties.getMaxRetries();
return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
.flatMap(t -> {
Integer retryCount = t.getT2();
if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
long delay = (long) Math.pow(retryDelaySeconds, retryCount);
return Mono.delay(Duration.ofSeconds(delay));
}
log.error("Done retrying to refresh pod: {}.", pod);
return Mono.<Long>empty();
});
});
}
private ResponseEntity<String> refresh(Item pod) {
return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
}
private String buildRefreshEndpoint(Item pod) {
return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
.buildAndExpand(pod.getIp(), pod.getPort())
.toUriString();
}
private boolean shouldRetry(Throwable t) {
boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
.map(HttpClientErrorException::getStatusCode)
.filter(s -> s.is4xxClientError())
.isPresent();
boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
.isPresent();
return timeoutError || !clientError;
}
}
问题是日志语句 Attempting to refresh pod
是在每个组的同一个线程上打印的。我在这里错过了什么?
来自测试的日志运行:
2017-02-07 10:g12:55.348 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.357 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.358 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.363 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.364 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.368 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.369 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.372 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.373 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.377 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.378 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.381 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
edit:感谢您新提供的日志,并且正如 David 在您创建的问题中发现的那样,根本原因是您使用了 interval
这里。这会将上下文切换到默认值 TimedScheduler
(所有组都相同)。这就是为什么在调用 refreshPods
之前所做的任何事情似乎都被忽略了(工作是在间隔线程上完成的),但是 publishOn/subscribeOn after 间隔运算符应该工作。简而言之,我在 create
之后直接使用 subscribeOn
的建议仍然有效 。
您触发了一个阻塞行为 (refresh(pod)
),您将其包装为 Mono
in refreshWithRetry
。
除非你非常需要成为这个级别的 concurrency-agnostic,否则我建议你立即将你的 subscribeOn
链接到 create
。
这样,使用 Mono
就不足为奇了:它遵守合同并且不会阻塞。像这样:
return Mono.<Boolean>create(emitter -> {
//...
})
.subscribeOn(Schedulers.newParallel("par-grp"))
.map(b -> Tuples.of(pod.getIp(), b))
如果您希望该方法 return 一个 concurrency-agnostic 发布者,那么您需要将 subscribeOn
放在靠近您的阻止发布者的位置,因此您需要扩展flatMap
拉姆达:
.flatMap(pods -> client.refreshPods(pods)
.subscribeOn(Schedulers.newParallel("par-grp"))
)
在您的代码中,您将 publishOn
放在 flatMap
之前。结合 flatMap
或 zip
等可观察对象的方法在使用异步源时会执行它们自己的 re-scheduling。 interval
在您的案例中就是这样一个异步源。这就是为什么您在 'timer' 线程上获得所有结果的原因。
1) 在您希望并行进行的操作之前使用 publishOn
。操作本身不应涉及re-scheduling。例如。 map
好,flatMap
不好。
2) 在它之后使用另一个 publishOn
来重新安排结果。否则订阅者的线程可能会干扰。
Flux.range(1, 100)
.groupBy(i -> i % 5)
.flatMap(group -> group
.publishOn(Schedulers.newParallel("grp", 8))
.map(v -> {
// processing here
String threadName = Thread.currentThread().getName();
logger.info("processing {} from {} on {}", v, group.key(), threadName);
return v;
})
.publishOn(Schedulers.single())
)
.subscribe(v -> logger.info("got {}", v));
如果您想确保所有组的项目 运行 在同一线程上,请参阅此答案:
为了完整起见,我自己发布了一个答案。在@simon-baslé 和@akarnokd 的帮助下,我做对了。以下两项工作。有关详细信息,请参阅 reactor-core#421。
解决方案 1:
zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
(x, delay) -> x)
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this:: refreshWithRetry)
解决方案 2:
zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")),
(x, delay) -> x)
.flatMap(this:: refreshWithRetry)
在 refreshPods
方法中不需要 subscribeOn
或 publishOn
。
使用 Reactor 3.0 项目。4.RELEASE。从概念上讲,在 RxJava 中也应该相同。
public Mono<Map<String, Boolean>> refreshPods(List<String> apps) {
return pods(apps)
.filter(this::isRunningAndNotThisApp)
.groupBy(Item::getName)
.flatMap(g -> g
.distinct(Item::getIp)
.collectList()
// TODO: This doesn't seem to be working as expected
.subscribeOn(Schedulers.newParallel("par-grp"))
.flatMap(client::refreshPods))
.flatMap(m -> Flux.fromIterable(m.entrySet()))
.collectMap(Map.Entry::getKey, Map.Entry::getValue);
}
想法是 运行 client.refreshPods
在每个组的单独线程中。
编辑:我在发布此问题之前和此处给出的答案之后尝试了 publishOn
,但输出没有改变。
客户:
public class MyServiceClientImpl implements MyServiceClient {
private final RestOperations restOperations;
private final ConfigRefreshProperties configRefreshProperties;
public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) {
return Flux.fromIterable(pods)
.zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())),
(x, delay) -> x)
.flatMap(this::refreshWithRetry)
.collectMap(Tuple2::getT1, Tuple2::getT2);
}
private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) {
return Mono.<Boolean>create(emitter -> {
try {
log.info("Attempting to refresh pod: {}.", pod);
ResponseEntity<String> tryRefresh = refresh(pod);
if (!tryRefresh.getStatusCode().is2xxSuccessful()) {
log.error("Failed to refresh pod: {}.", pod);
emitter.success();
} else {
log.info("Successfully refreshed pod: {}.", pod);
emitter.success(true);
}
} catch (Exception e) {
emitter.error(e);
}
})
.map(b -> Tuples.of(pod.getIp(), b))
.log(getClass().getName(), Level.FINE)
.retryWhen(errors -> {
int maxRetries = configRefreshProperties.getMaxRetries();
return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i))
.flatMap(t -> {
Integer retryCount = t.getT2();
if (retryCount <= maxRetries && shouldRetry(t.getT1())) {
int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds();
long delay = (long) Math.pow(retryDelaySeconds, retryCount);
return Mono.delay(Duration.ofSeconds(delay));
}
log.error("Done retrying to refresh pod: {}.", pod);
return Mono.<Long>empty();
});
});
}
private ResponseEntity<String> refresh(Item pod) {
return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class);
}
private String buildRefreshEndpoint(Item pod) {
return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh")
.buildAndExpand(pod.getIp(), pod.getPort())
.toUriString();
}
private boolean shouldRetry(Throwable t) {
boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class)
.map(HttpClientErrorException::getStatusCode)
.filter(s -> s.is4xxClientError())
.isPresent();
boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class)
.isPresent();
return timeoutError || !clientError;
}
}
问题是日志语句 Attempting to refresh pod
是在每个组的同一个线程上打印的。我在这里错过了什么?
来自测试的日志运行:
2017-02-07 10:g12:55.348 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.357 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.358 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.363 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.364 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.368 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.369 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.372 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.373 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.377 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.378 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
2017-02-07 10:12:55.381 INFO 33905 --- [ timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running).
edit:感谢您新提供的日志,并且正如 David 在您创建的问题中发现的那样,根本原因是您使用了 interval
这里。这会将上下文切换到默认值 TimedScheduler
(所有组都相同)。这就是为什么在调用 refreshPods
之前所做的任何事情似乎都被忽略了(工作是在间隔线程上完成的),但是 publishOn/subscribeOn after 间隔运算符应该工作。简而言之,我在 create
之后直接使用 subscribeOn
的建议仍然有效 。
您触发了一个阻塞行为 (refresh(pod)
),您将其包装为 Mono
in refreshWithRetry
。
除非你非常需要成为这个级别的 concurrency-agnostic,否则我建议你立即将你的 subscribeOn
链接到 create
。
这样,使用 Mono
就不足为奇了:它遵守合同并且不会阻塞。像这样:
return Mono.<Boolean>create(emitter -> {
//...
})
.subscribeOn(Schedulers.newParallel("par-grp"))
.map(b -> Tuples.of(pod.getIp(), b))
如果您希望该方法 return 一个 concurrency-agnostic 发布者,那么您需要将 subscribeOn
放在靠近您的阻止发布者的位置,因此您需要扩展flatMap
拉姆达:
.flatMap(pods -> client.refreshPods(pods)
.subscribeOn(Schedulers.newParallel("par-grp"))
)
在您的代码中,您将 publishOn
放在 flatMap
之前。结合 flatMap
或 zip
等可观察对象的方法在使用异步源时会执行它们自己的 re-scheduling。 interval
在您的案例中就是这样一个异步源。这就是为什么您在 'timer' 线程上获得所有结果的原因。
1) 在您希望并行进行的操作之前使用 publishOn
。操作本身不应涉及re-scheduling。例如。 map
好,flatMap
不好。
2) 在它之后使用另一个 publishOn
来重新安排结果。否则订阅者的线程可能会干扰。
Flux.range(1, 100)
.groupBy(i -> i % 5)
.flatMap(group -> group
.publishOn(Schedulers.newParallel("grp", 8))
.map(v -> {
// processing here
String threadName = Thread.currentThread().getName();
logger.info("processing {} from {} on {}", v, group.key(), threadName);
return v;
})
.publishOn(Schedulers.single())
)
.subscribe(v -> logger.info("got {}", v));
如果您想确保所有组的项目 运行 在同一线程上,请参阅此答案:
为了完整起见,我自己发布了一个答案。在@simon-baslé 和@akarnokd 的帮助下,我做对了。以下两项工作。有关详细信息,请参阅 reactor-core#421。
解决方案 1:
zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
(x, delay) -> x)
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this:: refreshWithRetry)
解决方案 2:
zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")),
(x, delay) -> x)
.flatMap(this:: refreshWithRetry)
在 refreshPods
方法中不需要 subscribeOn
或 publishOn
。