将 Rx Observable 拆分为多个流并单独处理

Split Rx Observable into multiple streams and process individually

这是一张我试图完成的图片。

--a-b-c-a--bbb--a

分成

--a-----a------a --> 流

----b------bbb--- --> b 流

------c---------- --> c 流

那么,就可以

a.subscribe()
b.subscribe()
c.subscribe()

到目前为止,我发现的所有内容都是使用 groupBy() 拆分流,然后将所有内容折叠回单个流并在同一个函数中处理它们。我想要做的是以不同的方式处理每个派生流。

我现在的做法是做一堆过滤器。有更好的方法吗?

您不必从 groupBy 折叠 Observables。您可以改为订阅它们。

像这样:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

Action1<String> a = s -> System.out.print("-a-");

Action1<String> b = s -> System.out.print("-b-");

Action1<String> c = s -> System.out.print("-c-");

Observable
    .from(inputs)
    .groupBy(s -> s)
    .subscribe((g) -> {
        if ("a".equals(g.getKey())) {
            g.subscribe(a);
        }

        if ("b".equals(g.getKey())) {
            g.subscribe(b);
        }

        if ("c".equals(g.getKey())) {
            g.subscribe(c);
        }
    });

If 语句看起来有点难看,但至少您可以单独处理每个流。也许有办法避免它们。

简单易行,只需使用 filter

scala 中的一个例子

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

您只需要确保源可观察对象是热的。最简单的方法就是share吧。

我一直在考虑这个问题,Tomas 的解决方案还可以,但问题是它将流转换为热可观察对象。

您可以将 sharedefer 结合使用,以便与其他流一起使用冷可观察对象。

例如(Java):

var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
    var shared - originalObservable.share();
    var aSource = shared.filter(x -> x.equals("a"));
    var bSource = shared.filter(x -> x.equals("b"));
    var cSource = shared.filter(x -> x.equals("c"));
    // some logic for sources
    return shared;
});

在 RxJava 中有一个带有函数的特殊版本 publish operator

ObservableTransformer {
  it.publish { shared ->
    Observable.merge(
        shared.ofType(x).compose(transformerherex),
        shared.ofType(y).compose(transformerherey)
    )
  }
}

这将按类型拆分事件流。然后你可以通过组合不同的变压器来单独处理它们。他们都共享单一订阅。