RxJava:如何让代码异步工作?
RxJava: how to make code working asynchronously?
如果我对异步工作的结论有误,请纠正我,并提出任何如何使这项工作异步工作的建议。
此代码在网格单元格中执行计算的设置结果。
正如我假设的那样,异步工作应该按部分显示网格单元格。因此,如果我们有 8 个核心,我们可以看到显示了 8 个单元格,并且在一段时间内显示了另外 8 个单元格,依此类推(如果我设置了一些时间延迟)。但是现在结果单元格显示变成一个一个。
型号:
public class ListRepository implements ListRepositoryInterface {
private final Integer insertValue = 1000000;
private HashMap<String, BaseUnit> unitMap = new HashMap<>();
@Inject
public ListRepository() {}
public PublishSubject<BaseUnit> exec(int inputNumber) {
PublishSubject<BaseUnit> subject = PublishSubject.create();
Observable<BaseUnit> observable = getListObservable(inputNumber)
.subscribeOn(Schedulers.computation())
.flatMap(resultList ->
Observable.fromIterable(resultList)
.flatMap(listElem ->
Observable.fromArray(ListOperationName.values())
.map(operationElem -> {
ListUnit unit = new ListUnit(operationElem, listElem, 0);
calculate(unit, listElem);
unitMap.put(unit.getViewId(), unit);
return unit;
})
)
);
observable.subscribe(subject);
return subject;
}
private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
return Observable.fromCallable(() -> {
ArrayList<List<Integer>> list = new ArrayList<>();
Integer[] populatedArray = new Integer[inputNumber];
Arrays.fill(populatedArray, insertValue);
list.add(new ArrayList<>(Arrays.asList(populatedArray)));
list.add(new LinkedList<>(Arrays.asList(populatedArray)));
list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));
return list;
});
}
private void calculate(ListUnit unit, List<Integer> list) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
double start = getTime();
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (list) {
switch (unit.getOperationName()) {
case ADD_FIRST:
list.add(0, insertValue);
break;
case ADD_MID:
list.add(list.size() / 2, insertValue);
break;
case ADD_LAST:
list.add(insertValue);
break;
case SEARCH:
//noinspection unused
boolean contains = list.contains(insertValue);
break;
case RM_FIRST:
list.remove(0);
break;
case RM_MID:
list.remove(list.size() / 2);
break;
case RM_LAST:
list.remove(list.size() - 1);
break;
}
}
unit.setTime(getTime() - start);
}
private double getTime() {
return System.nanoTime();
}
public HashMap<String, BaseUnit> getUnitMap() {
return unitMap;
}
}
主持人:
public void calculate(int inputNumber) {
fragment.showAllProgressBars();
PublishSubject<BaseUnit> subject = repository.exec(inputNumber);
Disposable disposable = subject.observeOn(AndroidSchedulers.mainThread())
.subscribe(unit -> {
fragment.setCellText(unit.getViewId(), unit.getTimeString());
}, Throwable::printStackTrace);
}
更新:现在我制作了测试示例并尝试正确订阅主题。
如果我使用 subject.onNext() 它会异步工作,但我认为这是错误的,因为它不会检查 subject.hasComplete() 并得到“真”。
请参阅下面的评论“TODO”。
private void run() {
Log.d("APP", "INIT");
PublishSubject<String> subject = exec(1000000);
subject.observeOn(AndroidSchedulers.mainThread())
.subscribe(unit -> {
Log.d("STRING RESULT = ", unit);
if (subject.hasComplete()) {
//TODO: this condition should work if we use observable.subscribe(subject), not subject.onNext
Log.d("SUBJECT", "COMPLETED");
}
}, Throwable::printStackTrace);
}
private int insertValue = 1000000;
public PublishSubject<String> exec(int inputNumber) {
PublishSubject<String> subject = PublishSubject.create();
getListObservable(inputNumber)
.flatMap(resultList -> getOperationsObservable()
.flatMap(operationElem -> getResultListObservable(resultList)
.map(listElem ->
calculate(operationElem, listElem)
//TODO: should be smth like this, i.e. calculate.subscribe(subject)
// .subscribe(subject)
.subscribe(subject::onNext)
)
)
).subscribe();
return subject;
}
private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
return Observable.fromCallable(() -> {
ArrayList<List<Integer>> list = new ArrayList<>();
Integer[] populatedArray = new Integer[inputNumber];
Arrays.fill(populatedArray, insertValue);
list.add(new ArrayList<>(Arrays.asList(populatedArray)));
list.add(new LinkedList<>(Arrays.asList(populatedArray)));
list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));
return list;
}).subscribeOn(Schedulers.computation());
}
private Observable<String> calculate(ListOperationName operationName, List<Integer> list) {
return Observable.fromCallable(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
double start = getTime();
synchronized (list) {
switch (operationName) {
case ADD_FIRST:
list.add(0, insertValue);
break;
case ADD_MID:
list.add(list.size() / 2, insertValue);
break;
case ADD_LAST:
list.add(insertValue);
break;
case SEARCH:
//noinspection unused
boolean contains = list.contains(insertValue);
break;
case RM_FIRST:
list.remove(0);
break;
case RM_MID:
list.remove(list.size() / 2);
break;
case RM_LAST:
list.remove(list.size() - 1);
break;
}
}
return Double.toString(getTime() - start);
}).subscribeOn(Schedulers.newThread());
}
private Observable<ListOperationName> getOperationsObservable() {
return Observable.fromArray(ListOperationName.values());
}
private Observable<List<Integer>> getResultListObservable(ArrayList<List<Integer>> list) {
return Observable.fromIterable(list);
}
private double getTime() {
return System.nanoTime();
}
public enum ListOperationName {
ADD_FIRST,
ADD_MID,
ADD_LAST,
SEARCH,
RM_FIRST,
RM_MID,
RM_LAST;
}
Gradle:
dependencies {
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
}
如果我从您最近示例中的 calculate(...)
方法中删除 .subscribeOn(Schedulers.newThread());
并对 exec(...)
进行以下更改,我似乎获得了所需的并行性。
public Observable<String> exec( int inputNumber )
{
return getListObservable( inputNumber )
.flatMap( resultList -> getOperationsObservable()
.flatMap( operationElem -> getResultListObservable( resultList )
.flatMap( listElem -> Observable.just( listElem )
.subscribeOn( Schedulers.computation() )
.flatMap( __ -> calculate( operationElem, listElem )))));
}
我删除了 PublishSubject
- 这似乎是多余的,您可以订阅结果 Observable
,不是吗?
如果我对异步工作的结论有误,请纠正我,并提出任何如何使这项工作异步工作的建议。
此代码在网格单元格中执行计算的设置结果。 正如我假设的那样,异步工作应该按部分显示网格单元格。因此,如果我们有 8 个核心,我们可以看到显示了 8 个单元格,并且在一段时间内显示了另外 8 个单元格,依此类推(如果我设置了一些时间延迟)。但是现在结果单元格显示变成一个一个。
型号:
public class ListRepository implements ListRepositoryInterface {
private final Integer insertValue = 1000000;
private HashMap<String, BaseUnit> unitMap = new HashMap<>();
@Inject
public ListRepository() {}
public PublishSubject<BaseUnit> exec(int inputNumber) {
PublishSubject<BaseUnit> subject = PublishSubject.create();
Observable<BaseUnit> observable = getListObservable(inputNumber)
.subscribeOn(Schedulers.computation())
.flatMap(resultList ->
Observable.fromIterable(resultList)
.flatMap(listElem ->
Observable.fromArray(ListOperationName.values())
.map(operationElem -> {
ListUnit unit = new ListUnit(operationElem, listElem, 0);
calculate(unit, listElem);
unitMap.put(unit.getViewId(), unit);
return unit;
})
)
);
observable.subscribe(subject);
return subject;
}
private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
return Observable.fromCallable(() -> {
ArrayList<List<Integer>> list = new ArrayList<>();
Integer[] populatedArray = new Integer[inputNumber];
Arrays.fill(populatedArray, insertValue);
list.add(new ArrayList<>(Arrays.asList(populatedArray)));
list.add(new LinkedList<>(Arrays.asList(populatedArray)));
list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));
return list;
});
}
private void calculate(ListUnit unit, List<Integer> list) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
double start = getTime();
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (list) {
switch (unit.getOperationName()) {
case ADD_FIRST:
list.add(0, insertValue);
break;
case ADD_MID:
list.add(list.size() / 2, insertValue);
break;
case ADD_LAST:
list.add(insertValue);
break;
case SEARCH:
//noinspection unused
boolean contains = list.contains(insertValue);
break;
case RM_FIRST:
list.remove(0);
break;
case RM_MID:
list.remove(list.size() / 2);
break;
case RM_LAST:
list.remove(list.size() - 1);
break;
}
}
unit.setTime(getTime() - start);
}
private double getTime() {
return System.nanoTime();
}
public HashMap<String, BaseUnit> getUnitMap() {
return unitMap;
}
}
主持人:
public void calculate(int inputNumber) {
fragment.showAllProgressBars();
PublishSubject<BaseUnit> subject = repository.exec(inputNumber);
Disposable disposable = subject.observeOn(AndroidSchedulers.mainThread())
.subscribe(unit -> {
fragment.setCellText(unit.getViewId(), unit.getTimeString());
}, Throwable::printStackTrace);
}
更新:现在我制作了测试示例并尝试正确订阅主题。 如果我使用 subject.onNext() 它会异步工作,但我认为这是错误的,因为它不会检查 subject.hasComplete() 并得到“真”。 请参阅下面的评论“TODO”。
private void run() {
Log.d("APP", "INIT");
PublishSubject<String> subject = exec(1000000);
subject.observeOn(AndroidSchedulers.mainThread())
.subscribe(unit -> {
Log.d("STRING RESULT = ", unit);
if (subject.hasComplete()) {
//TODO: this condition should work if we use observable.subscribe(subject), not subject.onNext
Log.d("SUBJECT", "COMPLETED");
}
}, Throwable::printStackTrace);
}
private int insertValue = 1000000;
public PublishSubject<String> exec(int inputNumber) {
PublishSubject<String> subject = PublishSubject.create();
getListObservable(inputNumber)
.flatMap(resultList -> getOperationsObservable()
.flatMap(operationElem -> getResultListObservable(resultList)
.map(listElem ->
calculate(operationElem, listElem)
//TODO: should be smth like this, i.e. calculate.subscribe(subject)
// .subscribe(subject)
.subscribe(subject::onNext)
)
)
).subscribe();
return subject;
}
private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
return Observable.fromCallable(() -> {
ArrayList<List<Integer>> list = new ArrayList<>();
Integer[] populatedArray = new Integer[inputNumber];
Arrays.fill(populatedArray, insertValue);
list.add(new ArrayList<>(Arrays.asList(populatedArray)));
list.add(new LinkedList<>(Arrays.asList(populatedArray)));
list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));
return list;
}).subscribeOn(Schedulers.computation());
}
private Observable<String> calculate(ListOperationName operationName, List<Integer> list) {
return Observable.fromCallable(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
double start = getTime();
synchronized (list) {
switch (operationName) {
case ADD_FIRST:
list.add(0, insertValue);
break;
case ADD_MID:
list.add(list.size() / 2, insertValue);
break;
case ADD_LAST:
list.add(insertValue);
break;
case SEARCH:
//noinspection unused
boolean contains = list.contains(insertValue);
break;
case RM_FIRST:
list.remove(0);
break;
case RM_MID:
list.remove(list.size() / 2);
break;
case RM_LAST:
list.remove(list.size() - 1);
break;
}
}
return Double.toString(getTime() - start);
}).subscribeOn(Schedulers.newThread());
}
private Observable<ListOperationName> getOperationsObservable() {
return Observable.fromArray(ListOperationName.values());
}
private Observable<List<Integer>> getResultListObservable(ArrayList<List<Integer>> list) {
return Observable.fromIterable(list);
}
private double getTime() {
return System.nanoTime();
}
public enum ListOperationName {
ADD_FIRST,
ADD_MID,
ADD_LAST,
SEARCH,
RM_FIRST,
RM_MID,
RM_LAST;
}
Gradle:
dependencies {
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
}
如果我从您最近示例中的 calculate(...)
方法中删除 .subscribeOn(Schedulers.newThread());
并对 exec(...)
进行以下更改,我似乎获得了所需的并行性。
public Observable<String> exec( int inputNumber )
{
return getListObservable( inputNumber )
.flatMap( resultList -> getOperationsObservable()
.flatMap( operationElem -> getResultListObservable( resultList )
.flatMap( listElem -> Observable.just( listElem )
.subscribeOn( Schedulers.computation() )
.flatMap( __ -> calculate( operationElem, listElem )))));
}
我删除了 PublishSubject
- 这似乎是多余的,您可以订阅结果 Observable
,不是吗?