将 Rx Observable 拆分为多个流并单独处理
Split Rx Observable into multiple streams and process individually
这是一张我试图完成的图片。
--a-b-c-a--bbb--a
分成
--a-----a------a --> 流
----b------bbb--- --> b 流
------c---------- --> c 流
那么,就可以
a.subscribe()
b.subscribe()
c.subscribe()
到目前为止,我发现的所有内容都是使用 groupBy() 拆分流,然后将所有内容折叠回单个流并在同一个函数中处理它们。我想要做的是以不同的方式处理每个派生流。
我现在的做法是做一堆过滤器。有更好的方法吗?
您不必从 groupBy
折叠 Observables
。您可以改为订阅它们。
像这样:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
If 语句看起来有点难看,但至少您可以单独处理每个流。也许有办法避免它们。
简单易行,只需使用 filter
scala 中的一个例子
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
您只需要确保源可观察对象是热的。最简单的方法就是share
吧。
我一直在考虑这个问题,Tomas 的解决方案还可以,但问题是它将流转换为热可观察对象。
您可以将 share
与 defer
结合使用,以便与其他流一起使用冷可观察对象。
例如(Java):
var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
var shared - originalObservable.share();
var aSource = shared.filter(x -> x.equals("a"));
var bSource = shared.filter(x -> x.equals("b"));
var cSource = shared.filter(x -> x.equals("c"));
// some logic for sources
return shared;
});
在 RxJava 中有一个带有函数的特殊版本 publish operator。
ObservableTransformer {
it.publish { shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
这将按类型拆分事件流。然后你可以通过组合不同的变压器来单独处理它们。他们都共享单一订阅。
这是一张我试图完成的图片。
--a-b-c-a--bbb--a
分成
--a-----a------a --> 流
----b------bbb--- --> b 流
------c---------- --> c 流
那么,就可以
a.subscribe()
b.subscribe()
c.subscribe()
到目前为止,我发现的所有内容都是使用 groupBy() 拆分流,然后将所有内容折叠回单个流并在同一个函数中处理它们。我想要做的是以不同的方式处理每个派生流。
我现在的做法是做一堆过滤器。有更好的方法吗?
您不必从 groupBy
折叠 Observables
。您可以改为订阅它们。
像这样:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
If 语句看起来有点难看,但至少您可以单独处理每个流。也许有办法避免它们。
简单易行,只需使用 filter
scala 中的一个例子
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
您只需要确保源可观察对象是热的。最简单的方法就是share
吧。
我一直在考虑这个问题,Tomas 的解决方案还可以,但问题是它将流转换为热可观察对象。
您可以将 share
与 defer
结合使用,以便与其他流一起使用冷可观察对象。
例如(Java):
var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
var shared - originalObservable.share();
var aSource = shared.filter(x -> x.equals("a"));
var bSource = shared.filter(x -> x.equals("b"));
var cSource = shared.filter(x -> x.equals("c"));
// some logic for sources
return shared;
});
在 RxJava 中有一个带有函数的特殊版本 publish operator。
ObservableTransformer {
it.publish { shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
这将按类型拆分事件流。然后你可以通过组合不同的变压器来单独处理它们。他们都共享单一订阅。