Rx-Java:创建一个可配置的 Observable

Rx-Java: Creating a configurable Observable

我是 RxJava 的新手,我想知道如何创建可配置的 Observable?让我们想象一下,我可以像这样编写一个数据库到数据库的传输:

srcDb.getObservable(Bean.class)
     .sql(selectSql)
     .params(selectParams)
     .subscribe(
          trgDb.getSubscriber(Bean.class)
               .sql(insertSql)
     );

我已经可以用订阅者做到这一点,但我怎样才能以与 Observable 本身相同的方式获得一些小配置?

有两种方法可以做到这一点:

选项 #1:让您自己的对象进行配置,然后让 execute()query()toObservable() 切换域:

 srcDb
 .find(Bean.class)
 .sql(selectSql)
 .params(selectParams)
 .execute()
 .subscribe(
      trgDb.getSubscriber(Bean.class)
           .sql(insertSql)
 );

选项 #2:使用 .compose() 重复使用常用操作:

srcDb
.getObservable(Bean.class)
.compose(addSQLParameters())
.subscribe(
      trgDb.getSubscriber(Bean.class)
           .sql(insertSql)
 );

 <T> Transformer<T,T> addSQLParameters() {
   return obs -> obs.sql(selectSql).params(selectParams);
 }

我建议您使用选项 #1,因为它可以更好地管理您的代码部分。

也许我找到了一个可以接受的解决方法。看来我在这里需要做的是在 Observable 实例化本身之外进行双重绑定。例如。我需要一对相互依赖的 DbObservable 和 DbOnSubscribe,像这样:

DbObservable class:

public class DbObservable<T> extends Observable<T> {

    //Some parameter
    private String sql;

    protected DbObservable(DbOnSubscribe<T> onSub) {
        super(onSub);
    }

    //Getter for DbOnSubscribe
    public String getSql() {
        return sql;
    }

    //Chain parameter modifier
    public DbObservable<T> sql(String sql) {
        this.sql = sql;
        return this;
    }
}

DbOnSubscribe class:

public class DbOnSubscribe<T> implements Observable.OnSubscribe<T> {

    private DbObservable<T> dbObservable;

    @Override
    public void call(Subscriber<? super T> subscriber) {
        String sql = dbObservable.getSql(); //Access SQL param
        subscriber.onNext( (T) sql ); //Use subscriber
        subscriber.onCompleted();
    }

    //Set back-reference
    public void setDbObservable(DbObservable<T> dbObservable) {
        this.dbObservable = dbObservable;
    }
}

最后我们假定的 DbConnector class:

public class DbConnector {

    public DbObservable<String> getObservable() {
        DbOnSubscribe<String> onSub = new DbOnSubscribe<String>();
        DbObservable<String> obs = new DbObservable<>(onSub);
        onSub.setDbObservable(obs);
        return obs;
    }
}

所以当我尝试时...

public class DbObservableTest {

    public static void main(String[] args) {
        DbConnector srcDb = new DbConnector();

        srcDb.getObservable()
                .sql("some SQL")
                .subscribe(System.out::println);
    }
}

……真的有用!它打印出 "some SQL"。

结论

  1. 如果您想要超级干净并且不介意增加一两行代码,请按照 Joel 和 Tassos Bassoukos 的建议选择构建器。
  2. 如果你不怕稍微复杂一点的代码(应该总是封装在某个地方)并且你真的希望这些参数在你自己的 Observable 中,你可以尝试双重绑定的方式
  3. 还有更多选择吗?