RxJava unsubscribeOn 递归调用不在正确的线程中
RxJava unsubscribeOn with recursive call not in the right thread
我在 Android 应用程序中构建了一个无头 Web 视图,用于从网页中抓取 URL。每次我检索 URL 时,我可能需要用这个 URL 在网页中重做抓取。我正在使用 RxJava 并发处理这些操作,我正在使用 flatMap
函数进行递归调用。
问题是我需要在mainThread中处理WebView,所以我尝试添加.unsubscribeOn(AndroidSchedulers.mainThread())
但似乎没有用,dispose()
方法在HeadlessRequest
在我调用 observeOn(Schedulers.computation())
的最后一个线程中被调用。我应该更改什么才能在 mainThread 中执行 dispose()
方法?
这是我的代码:
无头请求
public class HeadlessRequest implements Disposable {
...
private class HeadlessWebView extends WebView {
...
private void destroyWebView() {
this.removeAllViews();
this.clearCache(false);
this.loadUrl("about:blank");
this.onPause();
this.removeAllViews();
this.destroy();
this.isDisposed = true;
}
}
@Override
public void dispose() {
// This doesn't print the mainThread id
Log.d(TAG, "Disposing on thread " + Thread.currentThread().getId());
this.webView.destroyWebView();
this.webView = null;
}
@Override
public boolean isDisposed() {
return (this.webView == null || this.webView.isDisposed);
}
}
网络工具
public static Single<Document> downloadPageHeadless(final String url, final int delay, final Context context) {
return Single.create((SingleEmitter<Document> emitter) -> {
try {
emitter.setDisposable(new HeadlessRequest(url, USER_AGENT, delay, context, emitter::onSuccess, emitter::onError));
} catch (Exception e) {
emitter.onError(e);
}
}).unsubscribeOn(AndroidSchedulers.mainThread()) // It MUST be executed on the mainThread
.subscribeOn(AndroidSchedulers.mainThread());
}
服务器服务
private static Single<String> resolveRecursive(String url, Context context) {
Server server = getServerInstance(url, context);
if (server == null) {
return Single.error(new UnsupportedOperationException("Server for " + url + " not supported"));
} else if (server.isVideo()) {
return server.resolve(url, context); // This method return a Single with observeOn(Schedulers.computation())
} else {
return server.resolve(url, context)
.observeOn(Schedulers.computation())
.flatMap(resolvedUrl -> resolveRecursive(resolvedUrl, context));
}
}
public static Single<String> resolveURL(String url, Context context) {
return resolveRecursive(url, context)
.observeOn(AndroidSchedulers.mainThread());
}
最后我找到了另一种方法来在没有RxJava的情况下在mainThread中处理webview。我使用了 WebView 的 post
方法。
private void destroyWebView() {
this.post(() -> {
this.removeAllViews();
this.clearCache(false);
this.loadUrl("about:blank");
this.onPause();
this.removeAllViews();
this.destroy();
this.isDisposed = true;
});
}
我在 Android 应用程序中构建了一个无头 Web 视图,用于从网页中抓取 URL。每次我检索 URL 时,我可能需要用这个 URL 在网页中重做抓取。我正在使用 RxJava 并发处理这些操作,我正在使用 flatMap
函数进行递归调用。
问题是我需要在mainThread中处理WebView,所以我尝试添加.unsubscribeOn(AndroidSchedulers.mainThread())
但似乎没有用,dispose()
方法在HeadlessRequest
在我调用 observeOn(Schedulers.computation())
的最后一个线程中被调用。我应该更改什么才能在 mainThread 中执行 dispose()
方法?
这是我的代码:
无头请求
public class HeadlessRequest implements Disposable {
...
private class HeadlessWebView extends WebView {
...
private void destroyWebView() {
this.removeAllViews();
this.clearCache(false);
this.loadUrl("about:blank");
this.onPause();
this.removeAllViews();
this.destroy();
this.isDisposed = true;
}
}
@Override
public void dispose() {
// This doesn't print the mainThread id
Log.d(TAG, "Disposing on thread " + Thread.currentThread().getId());
this.webView.destroyWebView();
this.webView = null;
}
@Override
public boolean isDisposed() {
return (this.webView == null || this.webView.isDisposed);
}
}
网络工具
public static Single<Document> downloadPageHeadless(final String url, final int delay, final Context context) {
return Single.create((SingleEmitter<Document> emitter) -> {
try {
emitter.setDisposable(new HeadlessRequest(url, USER_AGENT, delay, context, emitter::onSuccess, emitter::onError));
} catch (Exception e) {
emitter.onError(e);
}
}).unsubscribeOn(AndroidSchedulers.mainThread()) // It MUST be executed on the mainThread
.subscribeOn(AndroidSchedulers.mainThread());
}
服务器服务
private static Single<String> resolveRecursive(String url, Context context) {
Server server = getServerInstance(url, context);
if (server == null) {
return Single.error(new UnsupportedOperationException("Server for " + url + " not supported"));
} else if (server.isVideo()) {
return server.resolve(url, context); // This method return a Single with observeOn(Schedulers.computation())
} else {
return server.resolve(url, context)
.observeOn(Schedulers.computation())
.flatMap(resolvedUrl -> resolveRecursive(resolvedUrl, context));
}
}
public static Single<String> resolveURL(String url, Context context) {
return resolveRecursive(url, context)
.observeOn(AndroidSchedulers.mainThread());
}
最后我找到了另一种方法来在没有RxJava的情况下在mainThread中处理webview。我使用了 WebView 的 post
方法。
private void destroyWebView() {
this.post(() -> {
this.removeAllViews();
this.clearCache(false);
this.loadUrl("about:blank");
this.onPause();
this.removeAllViews();
this.destroy();
this.isDisposed = true;
});
}