Android WorkManager 中的异步 Worker
Asynchronous Worker in Android WorkManager
Google 最近宣布了新的 WorkManager
架构组件。通过在 Worker
class 中实现 doWork()
可以轻松安排同步工作,但是如果我想在后台做一些异步工作?例如,我想使用 Retrofit 进行网络服务调用。我知道我可以发出同步网络请求,但它会阻塞线程并且感觉不对。
有什么解决办法吗?还是目前不支持?
根据 WorkManager docs:
By default, WorkManager runs its operations on a background thread. If you are already running on a background thread and have need for synchronous (blocking) calls to WorkManager, use synchronous() to access such methods.
因此,如果您不使用 synchronous()
,您可以安全地从 doWork()
执行同步网络调用。从设计的角度来看,这也是一种更好的方法,因为回调很混乱。
也就是说,如果你真的想从 doWork()
触发异步作业,你需要暂停执行线程并在异步作业完成后使用 wait/notify
机制(或其他一些线程管理机制,例如 Semaphore
)。在大多数情况下我不会推荐。
附带说明一下,WorkManager 处于非常早期的 alpha 阶段。
我使用了一个倒计时锁存器并等待它达到 0,这只会在异步回调更新它后发生。请参阅此代码:
public WorkerResult doWork() {
final WorkerResult[] result = {WorkerResult.RETRY};
CountDownLatch countDownLatch = new CountDownLatch(1);
FirebaseFirestore db = FirebaseFirestore.getInstance();
db.collection("collection").whereEqualTo("this","that").get().addOnCompleteListener(task -> {
if(task.isSuccessful()) {
task.getResult().getDocuments().get(0).getReference().update("field", "value")
.addOnCompleteListener(task2 -> {
if (task2.isSuccessful()) {
result[0] = WorkerResult.SUCCESS;
} else {
result[0] = WorkerResult.RETRY;
}
countDownLatch.countDown();
});
} else {
result[0] = WorkerResult.RETRY;
countDownLatch.countDown();
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result[0];
}
如果您正在谈论异步作业,您可以将您的工作转移到 RxJava Observables / Singles 中。
有一组运算符,例如 .blockingGet()
或 .blockingFirst()
将 Observable<T>
转换为阻塞 T
Worker
在后台线程上执行,所以不用担心 NetworkOnMainThreadException
。
我用过BlockingQueue
,它简化了线程同步和线程间的结果传递,你只需要一个对象
private var disposable = Disposables.disposed()
private val completable = Completable.fromAction {
//do some heavy computation
}.subscribeOn(Schedulers.computation()) // you will do the work on background thread
override fun doWork(): Result {
val result = LinkedBlockingQueue<Result>()
disposable = completable.subscribe(
{ result.put(Result.SUCCESS) },
{ result.put(Result.RETRY) }
)
return try {
result.take() //need to block this thread untill completable has finished
} catch (e: InterruptedException) {
Result.RETRY
}
}
此外,如果您的 Worker 已停止,请不要忘记释放资源,这是相对于 .blockingGet()
的主要优势,因为现在您可以正确地免费取消您的 Rx 任务。
override fun onStopped(cancelled: Boolean) {
disposable.dispose()
}
仅供参考,现在有 ListenableWorker,它被设计为异步的。
编辑:以下是示例用法的一些片段。我删除了大块我认为不具有说明性的代码,所以这里很可能会出现一两个小错误。
这适用于获取字符串 photoKey、从服务器检索元数据、进行一些压缩工作,然后上传压缩照片的任务。这发生在主线程之外。以下是我们发送工作请求的方式:
private void compressAndUploadFile(final String photoKey) {
Data inputData = new Data.Builder()
.putString(UploadWorker.ARG_PHOTO_KEY, photoKey)
.build();
Constraints constraints = new Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build();
OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(UploadWorker.class)
.setInputData(inputData)
.setConstraints(constraints)
.build();
WorkManager.getInstance().enqueue(request);
}
并且在 UploadWorker 中:
public class UploadWorker extends ListenableWorker {
private static final String TAG = "UploadWorker";
public static final String ARG_PHOTO_KEY = "photo-key";
private String mPhotoKey;
/**
* @param appContext The application {@link Context}
* @param workerParams Parameters to setup the internal state of this worker
*/
public UploadWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
super(appContext, workerParams);
mPhotoKey = workerParams.getInputData().getString(ARG_PHOTO_KEY);
}
@NonNull
@Override
public ListenableFuture<Payload> onStartWork() {
SettableFuture<Payload> future = SettableFuture.create();
Photo photo = getPhotoMetadataFromServer(mPhotoKey).addOnCompleteListener(task -> {
if (!task.isSuccessful()) {
Log.e(TAG, "Failed to retrieve photo metadata", task.getException());
future.setException(task.getException());
return;
}
MyPhotoType photo = task.getResult();
File file = photo.getFile();
Log.d(TAG, "Compressing " + photo);
MyImageUtil.compressImage(file, MyConstants.photoUploadConfig).addOnCompleteListener(compressionTask -> {
if (!compressionTask.isSuccessful()) {
Log.e(TAG, "Could not parse " + photo + " as an image.", compressionTask.getException());
future.set(new Payload(Result.FAILURE));
return;
}
byte[] imageData = compressionTask.getResult();
Log.d(TAG, "Done compressing " + photo);
UploadUtil.uploadToServer(photo, imageData);
future.set(new Payload(Result.SUCCESS));
});
});
return future;
}
}
编辑
根据您在应用程序中使用的东西,您还可以扩展 RxWorker(如果您使用的是 RxJava)或 CoroutineWorker (如果您使用协程)。它们都来自 ListenableWorker.
借助协程的强大功能,您可以 'synchronise' 像这样 doWork()
:
暂停获取位置的方法(异步):
private suspend fun getLocation(): Location = suspendCoroutine { continuation ->
val mFusedLocationClient = LocationServices.getFusedLocationProviderClient(appContext)
mFusedLocationClient.lastLocation.addOnSuccessListener {
continuation.resume(it)
}.addOnFailureListener {
continuation.resumeWithException(it)
}
}
doWork()
中的调用示例:
override fun doWork(): Result {
val loc = runBlocking {
getLocation()
}
val latitude = loc.latitude
}
2021 年更新:
您现在可以使用 CoroutineWorker
,它具有暂停 doWork()
方法。
class MySuspendWorker(private val appContext: Context, workerParams: WorkerParameters) : CoroutineWorker(appContext, workerParams) {
override suspend fun doWork(): Result {
//do your async work
}
}
我也更喜欢@TomH 推荐的方法。不过,我将它与 Firebase Storage 一起使用。将 WorkManager 与 CountDownlatch did the trick for me. Here a code snippet. Logs are done with Timber.
一起使用
它 returns 在任务完成之后但在工作人员 returns 成功之前 returns 来自 Firebase 的 downloadUrl 作为字符串。
@NonNull
@Override
public Result doWork() {
mFirebaseStorage = mFirebaseStorage.getInstance();
mTriviaImageStorageReference = mFirebaseStorage.getReference().child("images");
CountDownLatch countDown = new CountDownLatch(2);
Uri imageUri = Uri.parse(getInputData().getString(KEY_IMAGE_URI));
try {
// get the image reference
final StorageReference imageRef = mTriviaImageStorageReference.child(imageUri.getLastPathSegment());
// upload the image to Firebase
imageRef.putFile(imageUri).continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
@Override
public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
if (!task.isSuccessful()) {
throw task.getException();
}
countDown.countDown();
return imageRef.getDownloadUrl();
}
}).addOnCompleteListener(new OnCompleteListener<Uri>() {
@Override
public void onComplete(@NonNull Task<Uri> task) {
if (task.isSuccessful()) {
Timber.d("Image was successfully uploaded to Firebase");
Uri downloadUri = task.getResult();
String imageUrl = downloadUri.toString();
Timber.d(("URl of the image is: " + imageUrl));
mOutputData = new Data.Builder()
.putString(KEY_FIREBASE_IMAGE_URL, imageUrl)
.build();
countDown.countDown();
} else {
Toast.makeText(getApplicationContext(), "upload failed", Toast.LENGTH_SHORT).show();
countDown.countDown();
}
}
});
countDown.await();
return Result.success(mOutputData);
} catch (Throwable throwable) {
Timber.e(throwable, "Error uploading image");
return Result.failure();
}
}
来晚了,但这可能会对其他人有所帮助,
您可以使用 CoroutineWorker
并在 doWork() 内部使用名为 suspendCancellableCoroutine
的东西,它专门为此目的而设计。
下面是代码片段:
class FileDownloader(private val appContext: Context, params: WorkerParameters) :
CoroutineWorker(appContext, params) {
override suspend fun doWork(): Result {
try {
suspendCancellableCoroutine<Int> { cancellableContinuation ->
// Here you can call your asynchronous callback based network
override fun onComplete() {
cancellableContinuation.resumeWith(
kotlin.Result.success(100))
}
override fun onError(error: Error?) {
cancellableContinuation.resumeWithException(
error?.connectionException ?: Throwable()
)
}
}
}catch (e: Exception) {
return Result.failure()
}
return Result.success()
}
}
在这里,协程将停止,直到您调用 cancellableContinuation.resumeWith。
Google 最近宣布了新的 WorkManager
架构组件。通过在 Worker
class 中实现 doWork()
可以轻松安排同步工作,但是如果我想在后台做一些异步工作?例如,我想使用 Retrofit 进行网络服务调用。我知道我可以发出同步网络请求,但它会阻塞线程并且感觉不对。
有什么解决办法吗?还是目前不支持?
根据 WorkManager docs:
By default, WorkManager runs its operations on a background thread. If you are already running on a background thread and have need for synchronous (blocking) calls to WorkManager, use synchronous() to access such methods.
因此,如果您不使用 synchronous()
,您可以安全地从 doWork()
执行同步网络调用。从设计的角度来看,这也是一种更好的方法,因为回调很混乱。
也就是说,如果你真的想从 doWork()
触发异步作业,你需要暂停执行线程并在异步作业完成后使用 wait/notify
机制(或其他一些线程管理机制,例如 Semaphore
)。在大多数情况下我不会推荐。
附带说明一下,WorkManager 处于非常早期的 alpha 阶段。
我使用了一个倒计时锁存器并等待它达到 0,这只会在异步回调更新它后发生。请参阅此代码:
public WorkerResult doWork() {
final WorkerResult[] result = {WorkerResult.RETRY};
CountDownLatch countDownLatch = new CountDownLatch(1);
FirebaseFirestore db = FirebaseFirestore.getInstance();
db.collection("collection").whereEqualTo("this","that").get().addOnCompleteListener(task -> {
if(task.isSuccessful()) {
task.getResult().getDocuments().get(0).getReference().update("field", "value")
.addOnCompleteListener(task2 -> {
if (task2.isSuccessful()) {
result[0] = WorkerResult.SUCCESS;
} else {
result[0] = WorkerResult.RETRY;
}
countDownLatch.countDown();
});
} else {
result[0] = WorkerResult.RETRY;
countDownLatch.countDown();
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result[0];
}
如果您正在谈论异步作业,您可以将您的工作转移到 RxJava Observables / Singles 中。
有一组运算符,例如 .blockingGet()
或 .blockingFirst()
将 Observable<T>
转换为阻塞 T
Worker
在后台线程上执行,所以不用担心 NetworkOnMainThreadException
。
我用过BlockingQueue
,它简化了线程同步和线程间的结果传递,你只需要一个对象
private var disposable = Disposables.disposed()
private val completable = Completable.fromAction {
//do some heavy computation
}.subscribeOn(Schedulers.computation()) // you will do the work on background thread
override fun doWork(): Result {
val result = LinkedBlockingQueue<Result>()
disposable = completable.subscribe(
{ result.put(Result.SUCCESS) },
{ result.put(Result.RETRY) }
)
return try {
result.take() //need to block this thread untill completable has finished
} catch (e: InterruptedException) {
Result.RETRY
}
}
此外,如果您的 Worker 已停止,请不要忘记释放资源,这是相对于 .blockingGet()
的主要优势,因为现在您可以正确地免费取消您的 Rx 任务。
override fun onStopped(cancelled: Boolean) {
disposable.dispose()
}
仅供参考,现在有 ListenableWorker,它被设计为异步的。
编辑:以下是示例用法的一些片段。我删除了大块我认为不具有说明性的代码,所以这里很可能会出现一两个小错误。
这适用于获取字符串 photoKey、从服务器检索元数据、进行一些压缩工作,然后上传压缩照片的任务。这发生在主线程之外。以下是我们发送工作请求的方式:
private void compressAndUploadFile(final String photoKey) {
Data inputData = new Data.Builder()
.putString(UploadWorker.ARG_PHOTO_KEY, photoKey)
.build();
Constraints constraints = new Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build();
OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(UploadWorker.class)
.setInputData(inputData)
.setConstraints(constraints)
.build();
WorkManager.getInstance().enqueue(request);
}
并且在 UploadWorker 中:
public class UploadWorker extends ListenableWorker {
private static final String TAG = "UploadWorker";
public static final String ARG_PHOTO_KEY = "photo-key";
private String mPhotoKey;
/**
* @param appContext The application {@link Context}
* @param workerParams Parameters to setup the internal state of this worker
*/
public UploadWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
super(appContext, workerParams);
mPhotoKey = workerParams.getInputData().getString(ARG_PHOTO_KEY);
}
@NonNull
@Override
public ListenableFuture<Payload> onStartWork() {
SettableFuture<Payload> future = SettableFuture.create();
Photo photo = getPhotoMetadataFromServer(mPhotoKey).addOnCompleteListener(task -> {
if (!task.isSuccessful()) {
Log.e(TAG, "Failed to retrieve photo metadata", task.getException());
future.setException(task.getException());
return;
}
MyPhotoType photo = task.getResult();
File file = photo.getFile();
Log.d(TAG, "Compressing " + photo);
MyImageUtil.compressImage(file, MyConstants.photoUploadConfig).addOnCompleteListener(compressionTask -> {
if (!compressionTask.isSuccessful()) {
Log.e(TAG, "Could not parse " + photo + " as an image.", compressionTask.getException());
future.set(new Payload(Result.FAILURE));
return;
}
byte[] imageData = compressionTask.getResult();
Log.d(TAG, "Done compressing " + photo);
UploadUtil.uploadToServer(photo, imageData);
future.set(new Payload(Result.SUCCESS));
});
});
return future;
}
}
编辑
根据您在应用程序中使用的东西,您还可以扩展 RxWorker(如果您使用的是 RxJava)或 CoroutineWorker (如果您使用协程)。它们都来自 ListenableWorker.
借助协程的强大功能,您可以 'synchronise' 像这样 doWork()
:
暂停获取位置的方法(异步):
private suspend fun getLocation(): Location = suspendCoroutine { continuation ->
val mFusedLocationClient = LocationServices.getFusedLocationProviderClient(appContext)
mFusedLocationClient.lastLocation.addOnSuccessListener {
continuation.resume(it)
}.addOnFailureListener {
continuation.resumeWithException(it)
}
}
doWork()
中的调用示例:
override fun doWork(): Result {
val loc = runBlocking {
getLocation()
}
val latitude = loc.latitude
}
2021 年更新:
您现在可以使用 CoroutineWorker
,它具有暂停 doWork()
方法。
class MySuspendWorker(private val appContext: Context, workerParams: WorkerParameters) : CoroutineWorker(appContext, workerParams) {
override suspend fun doWork(): Result {
//do your async work
}
}
我也更喜欢@TomH 推荐的方法。不过,我将它与 Firebase Storage 一起使用。将 WorkManager 与 CountDownlatch did the trick for me. Here a code snippet. Logs are done with Timber.
一起使用它 returns 在任务完成之后但在工作人员 returns 成功之前 returns 来自 Firebase 的 downloadUrl 作为字符串。
@NonNull
@Override
public Result doWork() {
mFirebaseStorage = mFirebaseStorage.getInstance();
mTriviaImageStorageReference = mFirebaseStorage.getReference().child("images");
CountDownLatch countDown = new CountDownLatch(2);
Uri imageUri = Uri.parse(getInputData().getString(KEY_IMAGE_URI));
try {
// get the image reference
final StorageReference imageRef = mTriviaImageStorageReference.child(imageUri.getLastPathSegment());
// upload the image to Firebase
imageRef.putFile(imageUri).continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
@Override
public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
if (!task.isSuccessful()) {
throw task.getException();
}
countDown.countDown();
return imageRef.getDownloadUrl();
}
}).addOnCompleteListener(new OnCompleteListener<Uri>() {
@Override
public void onComplete(@NonNull Task<Uri> task) {
if (task.isSuccessful()) {
Timber.d("Image was successfully uploaded to Firebase");
Uri downloadUri = task.getResult();
String imageUrl = downloadUri.toString();
Timber.d(("URl of the image is: " + imageUrl));
mOutputData = new Data.Builder()
.putString(KEY_FIREBASE_IMAGE_URL, imageUrl)
.build();
countDown.countDown();
} else {
Toast.makeText(getApplicationContext(), "upload failed", Toast.LENGTH_SHORT).show();
countDown.countDown();
}
}
});
countDown.await();
return Result.success(mOutputData);
} catch (Throwable throwable) {
Timber.e(throwable, "Error uploading image");
return Result.failure();
}
}
来晚了,但这可能会对其他人有所帮助,
您可以使用 CoroutineWorker
并在 doWork() 内部使用名为 suspendCancellableCoroutine
的东西,它专门为此目的而设计。
下面是代码片段:
class FileDownloader(private val appContext: Context, params: WorkerParameters) :
CoroutineWorker(appContext, params) {
override suspend fun doWork(): Result {
try {
suspendCancellableCoroutine<Int> { cancellableContinuation ->
// Here you can call your asynchronous callback based network
override fun onComplete() {
cancellableContinuation.resumeWith(
kotlin.Result.success(100))
}
override fun onError(error: Error?) {
cancellableContinuation.resumeWithException(
error?.connectionException ?: Throwable()
)
}
}
}catch (e: Exception) {
return Result.failure()
}
return Result.success()
}
}
在这里,协程将停止,直到您调用 cancellableContinuation.resumeWith。