RxJava 可观察对象

RxJava Observable

我正在尝试处理我的第一个 RxJava 示例

我有一个带有文本框和三个按钮的主 activity。第一个按钮在单独的 class 中初始化一个整数。第二个按钮订阅了一个假设正在观察整数的可观察对象。第三个按钮将整数的值减一。

这是我的代码

package com.someurl.www.myobservable;

import android.support.v7.app.ActionBarActivity;
import android.os.Bundle;
import android.view.Menu;
import android.view.MenuItem;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;


public class MainActivity extends ActionBarActivity {

    TextView tvDisplay;

    Button btnInitialze;
    Button btnSubscribeClass;
    Button btnChangeInt;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        tvDisplay = (TextView)findViewById(R.id.textView);

        btnInitialze = (Button)findViewById(R.id.btnInitialize);
        btnSubscribeClass = (Button)findViewById(R.id.btnSubscribeClass);
        btnChangeInt = (Button)findViewById(R.id.btnChangeInt);



        btnInitialze.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                myObserve.InitializeBigInt(6);
            }
        });

        btnSubscribeClass.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                //subClassNow();
                subJust();
            }
        });

        btnChangeInt.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                int myNewInt = myObserve.DecreaseBigInt();
                tvDisplay.append("\nFrom Button " + String.valueOf(myNewInt));
            }
        });

    }



    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.
        getMenuInflater().inflate(R.menu.menu_main, menu);
        return true;
    }

    @Override
    public boolean onOptionsItemSelected(MenuItem item) {
        // Handle action bar item clicks here. The action bar will
        // automatically handle clicks on the Home/Up button, so long
        // as you specify a parent activity in AndroidManifest.xml.
        int id = item.getItemId();

        //noinspection SimplifiableIfStatement
        if (id == R.id.action_settings) {
            return true;
        }

        return super.onOptionsItemSelected(item);
    }

    Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello, World!");
            subscriber.onCompleted();
        }
    });

    public void subNow() {
         mObservable.subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                tvDisplay.append("\nDone!");

            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                tvDisplay.append(s);
            }
        });
    }

    private void subClassNow() {

        myObserve.mObservableClass.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                tvDisplay.append("\nClass Done!");
            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Class Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(Integer myInt) {
                tvDisplay.append("\nClass " + String.valueOf(myInt));
            }

        });
    }

    private void subJust() {
        myObserve.newObservableClass.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                tvDisplay.append("\nClass Done!");
            }

            @Override
            public void onError(Throwable e) {
                tvDisplay.append("Class Error! \n" + e.getMessage());
            }

            @Override
            public void onNext(Integer myInt) {
                tvDisplay.append("\nClass " + String.valueOf(myInt));
            }
        });
    }
}

和 class

package com.someurl.www.myobservable;

import rx.Observable;
import rx.Subscriber;

/**
 * Created by Admin on 6/21/15.
 */
public class myObserve {
    static Integer myBigInt;

    public static Observable<Integer> mObservableClass = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(myBigInt);
            //subscriber.onCompleted();
        }

    });

    public static void InitializeBigInt(Integer myInt){
        myBigInt = myInt;
    }

    public static Integer DecreaseBigInt(){
        myBigInt --;
        return myBigInt;
    }

    public static Observable<Integer> newObservableClass = Observable.just(myBigInt);

}

当我尝试使用 mObservableClass 进行订阅时,它只给了我 myBigInt 的值(当时是 6),然后它给了我 onComplete 完成!

然后我尝试使用 newObservableClass 认为我需要使用,而不是 .create,但后来我得到 return 为 null myBigInt 然后再次 onComplete 完成!

有人可以帮助我开始正确的方向,了解如何观察 myBigInt 的值变化。理想情况下,我想观察 myBigInt 的值,直到它减少到零 (0),然后在零时调用 onComplete Done!

谢谢,约翰

通常,您希望使用 BehaviorSubject 来存储您的值并传达对其的更改。但是,您似乎也需要原子递减功能。试试这个:

public class AtomicBehaviorSubject {
    private static final AtomicInteger value = new AtomicInteger();
    private static final Subject<Integer, Integer> setter = 
            BehaviorSubject.<Integer>create().toSerialized();

    public static void setValue(int newValue) {
        value.set(newValue);
        setter.onNext(newValue);
    }

    public static void decrementValue() {
        for (;;) {
            int curr = value.get();
            if (curr == 0) {
                return;
            }
            int u = curr - 1;
            if (value.compareAndSet(curr, u)) {
                if (u == 0) {
                    setter.onCompleted();
                } else {
                    setter.onNext(u);
                }
                return;
            }
        }
    }

    public static Observable<Integer> valueChanged() {
        return setter;
    }
}