过滤并重试直到 fiter 通过,通过的项目不会被重新测试
Filtering and retrying until the fiter passes, with passed items not being retested
给定一个表示某些外部可变资源的项目列表,我如何过滤该列表以仅发出某些项目并等待所有项目服从过滤器?
更具体地说:从文件列表中,构建一个 Flowable
按存在性过滤,只允许存在的文件。如果它们不存在,等待 5s 文件存在。
这是我的第一次尝试:
Flowable.fromArray(new File("/tmp/file-1"), new File("/tmp/file-2"))
.map(f -> {
boolean exists = f.exists();
System.out.println(f.getName() + " exists? " + exists);
if(exists) {
return f;
} else {
throw new RuntimeException(f.getName() + " doesn't exist");
}
})
.retryWhen(ft -> {
return ft.flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS));
})
.blockingForEach(f -> System.out.println(f.getName() + " exists!"));
但是这给出了:
file-1 exists? false
file-1 exists? false
file-1 exists? false
file-1 exists? false ** $ touch /tmp/file-1 **
file-1 exists? true
file-2 exists? false
file-1 exists!
file-1 exists? true ** BAD we are retesting! **
file-2 exists? false
file-1 exists! ** BAD the subscriber got a duplicate event! **
即使我在 retryWhen
之后添加 distinct
文件仍然被重新测试。
那么有没有一种方法可以只重新测试那些未通过先前测试的项目(不使用 Observable
之外的可变状态)?
对内部序列执行 retryWhen
并将它们 flatMap
放在一起:
source
.flatMap(file ->
Observable.fromCallable(() -> {
if (file.exists()) {
return file;
}
throw new RuntimeException();
})
.retryWhen(errors -> errors.delay(1, TimeUnit.SECONDS))
)
.blockingForEach(f -> System.out.println(f.getName() + " exists!"));
给定一个表示某些外部可变资源的项目列表,我如何过滤该列表以仅发出某些项目并等待所有项目服从过滤器?
更具体地说:从文件列表中,构建一个 Flowable
按存在性过滤,只允许存在的文件。如果它们不存在,等待 5s 文件存在。
这是我的第一次尝试:
Flowable.fromArray(new File("/tmp/file-1"), new File("/tmp/file-2"))
.map(f -> {
boolean exists = f.exists();
System.out.println(f.getName() + " exists? " + exists);
if(exists) {
return f;
} else {
throw new RuntimeException(f.getName() + " doesn't exist");
}
})
.retryWhen(ft -> {
return ft.flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS));
})
.blockingForEach(f -> System.out.println(f.getName() + " exists!"));
但是这给出了:
file-1 exists? false
file-1 exists? false
file-1 exists? false
file-1 exists? false ** $ touch /tmp/file-1 **
file-1 exists? true
file-2 exists? false
file-1 exists!
file-1 exists? true ** BAD we are retesting! **
file-2 exists? false
file-1 exists! ** BAD the subscriber got a duplicate event! **
即使我在 retryWhen
之后添加 distinct
文件仍然被重新测试。
那么有没有一种方法可以只重新测试那些未通过先前测试的项目(不使用 Observable
之外的可变状态)?
对内部序列执行 retryWhen
并将它们 flatMap
放在一起:
source
.flatMap(file ->
Observable.fromCallable(() -> {
if (file.exists()) {
return file;
}
throw new RuntimeException();
})
.retryWhen(errors -> errors.delay(1, TimeUnit.SECONDS))
)
.blockingForEach(f -> System.out.println(f.getName() + " exists!"));