如何帮助 Java 解析我的 RxJava 组合方法中的类型?
How can I help Java resolve the types in my RxJava composition method?
我想链接两个 RxJava Single
实例来创建一个 Observable
来发出它们的两个结果。此外,我需要第一个 Single
的结果来创建第二个。
这是我尝试过的:
public static <A extends C, B extends C, C> Observable<C> chain(final Single<A> a, final Function<A, Single<B>> f) {
return Observable.concat(
a.toObservable(),
a.flatMap(f::apply).toObservable());
}
用法可能如下所示:
final Observable<Event> task = MoreObservables.chain(
writeFile("Hello, world", "hello.txt"),
writeFileEvent -> processFile(writeFileEvent.path));
但是,Java 抱怨它无法解析类型:
Error:(54, 61) java: incompatible types: cannot infer type-variable(s) A,B,C
(argument mismatch; bad return type in lambda expression
io.reactivex.Single<ProcessFileEvent> cannot be converted to io.reactivex.Single<Event>)
当然,ProcessFileEvent
实现了 Event
。
如何编写我的函数以便 Java 可以找出类型?或者有更简单的方法来实现这个吗?
如果不知道确切的 writeFile
和 processFile
签名(从简单模拟应该可以编译),很难说出编译错误的原因。
无论如何,更惯用的方法是将 compose()
方法与自定义 ObservableTransformer
一起使用,以便拥有单个链而不是包装方法,从而降低链的可读性(read this).
这里还有一个逻辑问题,因为你使用 concat()
并使用两次 a Observable
,你实际上将执行 a 操作两次(a 将被订阅两次),这可能会导致性能问题在最不坏的情况下,或者是一个主要的微妙错误。 (在您的示例中,您将两次写入同一个文件)。
我认为你应该在这种情况下使用发布,为了执行一次,与合并一起,这将导致 Observable 发出 A 的结果,然后用 A 的结果执行 B,并发出这个结果:
变压器:
class PublishAndMergeTransformer<A extends C, B extends C, C> implements ObservableTransformer<A, C> {
final Function<A, Single<B>> f;
public PublishAndMergeTransformer(Function<A, Single<B>> f) {
this.f = f;
}
@Override
public ObservableSource<C> apply(Observable<A> a) {
return a.publish(aObservable ->
Observable.merge(
aObservable,
aObservable
.flatMap(a1 -> f.apply(a1).toObservable())
)
);
}
}
和使用示例:
writeFile("Hello, world", "hello.txt")
.toObservable()
.compose(new PublishAndMergeTransformer<>(writeFileEvent -> processFile(writeFileEvent.path)));
我想链接两个 RxJava Single
实例来创建一个 Observable
来发出它们的两个结果。此外,我需要第一个 Single
的结果来创建第二个。
这是我尝试过的:
public static <A extends C, B extends C, C> Observable<C> chain(final Single<A> a, final Function<A, Single<B>> f) {
return Observable.concat(
a.toObservable(),
a.flatMap(f::apply).toObservable());
}
用法可能如下所示:
final Observable<Event> task = MoreObservables.chain(
writeFile("Hello, world", "hello.txt"),
writeFileEvent -> processFile(writeFileEvent.path));
但是,Java 抱怨它无法解析类型:
Error:(54, 61) java: incompatible types: cannot infer type-variable(s) A,B,C
(argument mismatch; bad return type in lambda expression
io.reactivex.Single<ProcessFileEvent> cannot be converted to io.reactivex.Single<Event>)
当然,ProcessFileEvent
实现了 Event
。
如何编写我的函数以便 Java 可以找出类型?或者有更简单的方法来实现这个吗?
如果不知道确切的 writeFile
和 processFile
签名(从简单模拟应该可以编译),很难说出编译错误的原因。
无论如何,更惯用的方法是将 compose()
方法与自定义 ObservableTransformer
一起使用,以便拥有单个链而不是包装方法,从而降低链的可读性(read this).
这里还有一个逻辑问题,因为你使用 concat()
并使用两次 a Observable
,你实际上将执行 a 操作两次(a 将被订阅两次),这可能会导致性能问题在最不坏的情况下,或者是一个主要的微妙错误。 (在您的示例中,您将两次写入同一个文件)。
我认为你应该在这种情况下使用发布,为了执行一次,与合并一起,这将导致 Observable 发出 A 的结果,然后用 A 的结果执行 B,并发出这个结果:
变压器:
class PublishAndMergeTransformer<A extends C, B extends C, C> implements ObservableTransformer<A, C> {
final Function<A, Single<B>> f;
public PublishAndMergeTransformer(Function<A, Single<B>> f) {
this.f = f;
}
@Override
public ObservableSource<C> apply(Observable<A> a) {
return a.publish(aObservable ->
Observable.merge(
aObservable,
aObservable
.flatMap(a1 -> f.apply(a1).toObservable())
)
);
}
}
和使用示例:
writeFile("Hello, world", "hello.txt")
.toObservable()
.compose(new PublishAndMergeTransformer<>(writeFileEvent -> processFile(writeFileEvent.path)));