rxjava2:使用连接的可完成项并在 IO 线程中观察它们
rxjava2: Using concatenated completables and observing them in IO thread
起初,我知道不应该从主线程调用网络操作。这就是为什么我在 Schedulers.io()!
上观察可完成项
我正在尝试连接两个可完成的。两者都可以使用网络,这就是我订阅 Schedulers.io() 的原因。如果我使用 concatWith(or andThen),代码会因 NetworkOnMainThreadException 而失败。这是科特林代码:
val singleSubject = SingleSubject.create<String>();
completalbe1.concatWith(completable2)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)//here i got exception
singleSubject.onError(error)
})
return singleSubject
如果我在没有可完成链接的情况下重写代码 - 一切正常。这是工作代码:
val singleSubject = SingleSubject.create<String>();
completable1
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
completable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)
singleSubject.onError(error)
})
}, {error ->
Log.e(tag, error.message, error)
singleSubject.onError(error)
})
return singleSubject
我想知道为什么第一个片段不起作用但第二个片段起作用?
UPD1:这是堆栈跟踪:
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1273)
at libcore.io.BlockGuardOs.recvfrom(BlockGuardOs.java:249)
at libcore.io.IoBridge.recvfrom(IoBridge.java:549)
at java.net.PlainSocketImpl.read(PlainSocketImpl.java:481)
at java.net.PlainSocketImpl.access[=12=]0(PlainSocketImpl.java:37)
at java.net.PlainSocketImpl$PlainSocketInputStream.read(PlainSocketImpl.java:237)
at okio.Okio.read(Okio.java:139)
at okio.AsyncTimeout.read(AsyncTimeout.java:237)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
at okhttp3.internal.connection.RealConnection.isHealthy(RealConnection.java:498)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:133)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:211)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall.execute(RealCall.java:69)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10955)
at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10955)
at io.reactivex.internal.operators.observable.ObservableIgnoreElementsCompletable.subscribeActual(ObservableIgnoreElementsCompletable.java:31)
at io.reactivex.Completable.subscribe(Completable.java:1664)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65)
at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
at com.catalyst.opti.AppManager$transferImage$subscribe.onStateChanged(AppManager.kt:323)
at com.amazonaws.mobileconnectors.s3.transferutility.TransferStatusUpdater.run(TransferStatusUpdater.java:172)
at android.os.Handler.handleCallback(Handler.java:742)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:5527)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:739)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:629)
UPD2:
completable1 是一个将文件上传到 AWS S3 的函数:
private fun transferImage(imageName: String, image: File): Completable {
return Completable.create(object : CompletableOnSubscribe {
override fun subscribe(e: CompletableEmitter) {
val transferObserver = transferUtility.upload("some", imageName, image)
transferObserver.setTransferListener(object : TransferListener {
override fun onProgressChanged(id: Int, bytesCurrent: Long, bytesTotal: Long) {
Log.i(tag, "bytesCurrent: $bytesCurrent, bytesTotal: $bytesTotal")
}
override fun onStateChanged(id: Int, state: TransferState?) {
if (state == TransferState.COMPLETED) {
e.onComplete()
}
}
override fun onError(id: Int, ex: java.lang.Exception) {
Log.d(tag, "error transfer s3: ${ex.message}", ex)
e.onError(ex)
}
})
}
});
}
completable2 是 retrofit2 调用:
@POST("some")
fun verifyLocation(@Header(AUTH_TOKEN_HEADER) authToken: String, @Body
verifyLocation: VerifyLocation): Completable
我猜想 transferObserver.setTransferListener
在主线程上调用回调,然后主线程也会订阅 completable2
。您必须将 subscribeOn(Schedulers.io())
应用于 completable2
,就像在您的其他示例中一样。
val singleSubject = SingleSubject.create<String>();
completalbe1.subscribeOn(Schedulers.io())
.concatWith(completable2.subscribeOn(Schedulers.io())) // <-----------------------
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)//here i got exception
singleSubject.onError(error)
})
return singleSubject
subscribeOn
影响订阅(副作用)但你的 completalbe1
在主线程上调用 onComplete
时有观察效果。
起初,我知道不应该从主线程调用网络操作。这就是为什么我在 Schedulers.io()!
上观察可完成项我正在尝试连接两个可完成的。两者都可以使用网络,这就是我订阅 Schedulers.io() 的原因。如果我使用 concatWith(or andThen),代码会因 NetworkOnMainThreadException 而失败。这是科特林代码:
val singleSubject = SingleSubject.create<String>();
completalbe1.concatWith(completable2)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)//here i got exception
singleSubject.onError(error)
})
return singleSubject
如果我在没有可完成链接的情况下重写代码 - 一切正常。这是工作代码:
val singleSubject = SingleSubject.create<String>();
completable1
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
completable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)
singleSubject.onError(error)
})
}, {error ->
Log.e(tag, error.message, error)
singleSubject.onError(error)
})
return singleSubject
我想知道为什么第一个片段不起作用但第二个片段起作用?
UPD1:这是堆栈跟踪:
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1273)
at libcore.io.BlockGuardOs.recvfrom(BlockGuardOs.java:249)
at libcore.io.IoBridge.recvfrom(IoBridge.java:549)
at java.net.PlainSocketImpl.read(PlainSocketImpl.java:481)
at java.net.PlainSocketImpl.access[=12=]0(PlainSocketImpl.java:37)
at java.net.PlainSocketImpl$PlainSocketInputStream.read(PlainSocketImpl.java:237)
at okio.Okio.read(Okio.java:139)
at okio.AsyncTimeout.read(AsyncTimeout.java:237)
at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
at okhttp3.internal.connection.RealConnection.isHealthy(RealConnection.java:498)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:133)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:211)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall.execute(RealCall.java:69)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10955)
at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10955)
at io.reactivex.internal.operators.observable.ObservableIgnoreElementsCompletable.subscribeActual(ObservableIgnoreElementsCompletable.java:31)
at io.reactivex.Completable.subscribe(Completable.java:1664)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89)
at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65)
at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
at com.catalyst.opti.AppManager$transferImage$subscribe.onStateChanged(AppManager.kt:323)
at com.amazonaws.mobileconnectors.s3.transferutility.TransferStatusUpdater.run(TransferStatusUpdater.java:172)
at android.os.Handler.handleCallback(Handler.java:742)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:5527)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:739)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:629)
UPD2:
completable1 是一个将文件上传到 AWS S3 的函数:
private fun transferImage(imageName: String, image: File): Completable {
return Completable.create(object : CompletableOnSubscribe {
override fun subscribe(e: CompletableEmitter) {
val transferObserver = transferUtility.upload("some", imageName, image)
transferObserver.setTransferListener(object : TransferListener {
override fun onProgressChanged(id: Int, bytesCurrent: Long, bytesTotal: Long) {
Log.i(tag, "bytesCurrent: $bytesCurrent, bytesTotal: $bytesTotal")
}
override fun onStateChanged(id: Int, state: TransferState?) {
if (state == TransferState.COMPLETED) {
e.onComplete()
}
}
override fun onError(id: Int, ex: java.lang.Exception) {
Log.d(tag, "error transfer s3: ${ex.message}", ex)
e.onError(ex)
}
})
}
});
}
completable2 是 retrofit2 调用:
@POST("some")
fun verifyLocation(@Header(AUTH_TOKEN_HEADER) authToken: String, @Body
verifyLocation: VerifyLocation): Completable
我猜想 transferObserver.setTransferListener
在主线程上调用回调,然后主线程也会订阅 completable2
。您必须将 subscribeOn(Schedulers.io())
应用于 completable2
,就像在您的其他示例中一样。
val singleSubject = SingleSubject.create<String>();
completalbe1.subscribeOn(Schedulers.io())
.concatWith(completable2.subscribeOn(Schedulers.io())) // <-----------------------
.observeOn(Schedulers.io())
.subscribe({
singleSubject.onSuccess("ok")
}, { error ->
Log.e(tag, error.message, error)//here i got exception
singleSubject.onError(error)
})
return singleSubject
subscribeOn
影响订阅(副作用)但你的 completalbe1
在主线程上调用 onComplete
时有观察效果。