RxJava 可观察对象
RxJava Observable
我正在尝试处理我的第一个 RxJava 示例
我有一个带有文本框和三个按钮的主 activity。第一个按钮在单独的 class 中初始化一个整数。第二个按钮订阅了一个假设正在观察整数的可观察对象。第三个按钮将整数的值减一。
这是我的代码
package com.someurl.www.myobservable;
import android.support.v7.app.ActionBarActivity;
import android.os.Bundle;
import android.view.Menu;
import android.view.MenuItem;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class MainActivity extends ActionBarActivity {
TextView tvDisplay;
Button btnInitialze;
Button btnSubscribeClass;
Button btnChangeInt;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvDisplay = (TextView)findViewById(R.id.textView);
btnInitialze = (Button)findViewById(R.id.btnInitialize);
btnSubscribeClass = (Button)findViewById(R.id.btnSubscribeClass);
btnChangeInt = (Button)findViewById(R.id.btnChangeInt);
btnInitialze.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
myObserve.InitializeBigInt(6);
}
});
btnSubscribeClass.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
//subClassNow();
subJust();
}
});
btnChangeInt.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
int myNewInt = myObserve.DecreaseBigInt();
tvDisplay.append("\nFrom Button " + String.valueOf(myNewInt));
}
});
}
@Override
public boolean onCreateOptionsMenu(Menu menu) {
// Inflate the menu; this adds items to the action bar if it is present.
getMenuInflater().inflate(R.menu.menu_main, menu);
return true;
}
@Override
public boolean onOptionsItemSelected(MenuItem item) {
// Handle action bar item clicks here. The action bar will
// automatically handle clicks on the Home/Up button, so long
// as you specify a parent activity in AndroidManifest.xml.
int id = item.getItemId();
//noinspection SimplifiableIfStatement
if (id == R.id.action_settings) {
return true;
}
return super.onOptionsItemSelected(item);
}
Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello, World!");
subscriber.onCompleted();
}
});
public void subNow() {
mObservable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
tvDisplay.append("\nDone!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Error! \n" + e.getMessage());
}
@Override
public void onNext(String s) {
tvDisplay.append(s);
}
});
}
private void subClassNow() {
myObserve.mObservableClass.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
tvDisplay.append("\nClass Done!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Class Error! \n" + e.getMessage());
}
@Override
public void onNext(Integer myInt) {
tvDisplay.append("\nClass " + String.valueOf(myInt));
}
});
}
private void subJust() {
myObserve.newObservableClass.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
tvDisplay.append("\nClass Done!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Class Error! \n" + e.getMessage());
}
@Override
public void onNext(Integer myInt) {
tvDisplay.append("\nClass " + String.valueOf(myInt));
}
});
}
}
和 class
package com.someurl.www.myobservable;
import rx.Observable;
import rx.Subscriber;
/**
* Created by Admin on 6/21/15.
*/
public class myObserve {
static Integer myBigInt;
public static Observable<Integer> mObservableClass = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(myBigInt);
//subscriber.onCompleted();
}
});
public static void InitializeBigInt(Integer myInt){
myBigInt = myInt;
}
public static Integer DecreaseBigInt(){
myBigInt --;
return myBigInt;
}
public static Observable<Integer> newObservableClass = Observable.just(myBigInt);
}
当我尝试使用 mObservableClass
进行订阅时,它只给了我 myBigInt
的值(当时是 6),然后它给了我 onComplete
完成!
然后我尝试使用 newObservableClass
认为我需要使用,而不是 .create
,但后来我得到 return 为 null myBigInt
然后再次 onComplete
完成!
有人可以帮助我开始正确的方向,了解如何观察 myBigInt
的值变化。理想情况下,我想观察 myBigInt 的值,直到它减少到零 (0),然后在零时调用 onComplete Done!
谢谢,约翰
通常,您希望使用 BehaviorSubject
来存储您的值并传达对其的更改。但是,您似乎也需要原子递减功能。试试这个:
public class AtomicBehaviorSubject {
private static final AtomicInteger value = new AtomicInteger();
private static final Subject<Integer, Integer> setter =
BehaviorSubject.<Integer>create().toSerialized();
public static void setValue(int newValue) {
value.set(newValue);
setter.onNext(newValue);
}
public static void decrementValue() {
for (;;) {
int curr = value.get();
if (curr == 0) {
return;
}
int u = curr - 1;
if (value.compareAndSet(curr, u)) {
if (u == 0) {
setter.onCompleted();
} else {
setter.onNext(u);
}
return;
}
}
}
public static Observable<Integer> valueChanged() {
return setter;
}
}
我正在尝试处理我的第一个 RxJava 示例
我有一个带有文本框和三个按钮的主 activity。第一个按钮在单独的 class 中初始化一个整数。第二个按钮订阅了一个假设正在观察整数的可观察对象。第三个按钮将整数的值减一。
这是我的代码
package com.someurl.www.myobservable;
import android.support.v7.app.ActionBarActivity;
import android.os.Bundle;
import android.view.Menu;
import android.view.MenuItem;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class MainActivity extends ActionBarActivity {
TextView tvDisplay;
Button btnInitialze;
Button btnSubscribeClass;
Button btnChangeInt;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
tvDisplay = (TextView)findViewById(R.id.textView);
btnInitialze = (Button)findViewById(R.id.btnInitialize);
btnSubscribeClass = (Button)findViewById(R.id.btnSubscribeClass);
btnChangeInt = (Button)findViewById(R.id.btnChangeInt);
btnInitialze.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
myObserve.InitializeBigInt(6);
}
});
btnSubscribeClass.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
//subClassNow();
subJust();
}
});
btnChangeInt.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
int myNewInt = myObserve.DecreaseBigInt();
tvDisplay.append("\nFrom Button " + String.valueOf(myNewInt));
}
});
}
@Override
public boolean onCreateOptionsMenu(Menu menu) {
// Inflate the menu; this adds items to the action bar if it is present.
getMenuInflater().inflate(R.menu.menu_main, menu);
return true;
}
@Override
public boolean onOptionsItemSelected(MenuItem item) {
// Handle action bar item clicks here. The action bar will
// automatically handle clicks on the Home/Up button, so long
// as you specify a parent activity in AndroidManifest.xml.
int id = item.getItemId();
//noinspection SimplifiableIfStatement
if (id == R.id.action_settings) {
return true;
}
return super.onOptionsItemSelected(item);
}
Observable<String> mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello, World!");
subscriber.onCompleted();
}
});
public void subNow() {
mObservable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
tvDisplay.append("\nDone!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Error! \n" + e.getMessage());
}
@Override
public void onNext(String s) {
tvDisplay.append(s);
}
});
}
private void subClassNow() {
myObserve.mObservableClass.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
tvDisplay.append("\nClass Done!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Class Error! \n" + e.getMessage());
}
@Override
public void onNext(Integer myInt) {
tvDisplay.append("\nClass " + String.valueOf(myInt));
}
});
}
private void subJust() {
myObserve.newObservableClass.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
tvDisplay.append("\nClass Done!");
}
@Override
public void onError(Throwable e) {
tvDisplay.append("Class Error! \n" + e.getMessage());
}
@Override
public void onNext(Integer myInt) {
tvDisplay.append("\nClass " + String.valueOf(myInt));
}
});
}
}
和 class
package com.someurl.www.myobservable;
import rx.Observable;
import rx.Subscriber;
/**
* Created by Admin on 6/21/15.
*/
public class myObserve {
static Integer myBigInt;
public static Observable<Integer> mObservableClass = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(myBigInt);
//subscriber.onCompleted();
}
});
public static void InitializeBigInt(Integer myInt){
myBigInt = myInt;
}
public static Integer DecreaseBigInt(){
myBigInt --;
return myBigInt;
}
public static Observable<Integer> newObservableClass = Observable.just(myBigInt);
}
当我尝试使用 mObservableClass
进行订阅时,它只给了我 myBigInt
的值(当时是 6),然后它给了我 onComplete
完成!
然后我尝试使用 newObservableClass
认为我需要使用,而不是 .create
,但后来我得到 return 为 null myBigInt
然后再次 onComplete
完成!
有人可以帮助我开始正确的方向,了解如何观察 myBigInt
的值变化。理想情况下,我想观察 myBigInt 的值,直到它减少到零 (0),然后在零时调用 onComplete Done!
谢谢,约翰
通常,您希望使用 BehaviorSubject
来存储您的值并传达对其的更改。但是,您似乎也需要原子递减功能。试试这个:
public class AtomicBehaviorSubject {
private static final AtomicInteger value = new AtomicInteger();
private static final Subject<Integer, Integer> setter =
BehaviorSubject.<Integer>create().toSerialized();
public static void setValue(int newValue) {
value.set(newValue);
setter.onNext(newValue);
}
public static void decrementValue() {
for (;;) {
int curr = value.get();
if (curr == 0) {
return;
}
int u = curr - 1;
if (value.compareAndSet(curr, u)) {
if (u == 0) {
setter.onCompleted();
} else {
setter.onNext(u);
}
return;
}
}
}
public static Observable<Integer> valueChanged() {
return setter;
}
}