ThreadPoolExecutor : 获取正在执行的特定Runnable
ThreadPoolExecutor : Get a specific Runnable that is being executed
我正在使用 ThreadPoolExecutor
在后台执行多个长 运行 任务,ThreadPoolExecutor
的池大小为 4,因此当添加超过 4 个任务时,它们将被推送到队列,当 4 个任务之一完成时,从队列中弹出一个任务以供执行。
我想知道有什么方法可以访问 Runnable
当前正在执行但不在队列中的对象,即前 4 个任务。
目标:我想这样做是为了在任何给定点获取任务的当前状态,在 mThreadPoolExecutor.getQueue()
的帮助下,我正在访问正在排队并准备执行的任务,请建议我访问任务的方法这些目前正在执行,以便我可以在需要时附加和删除 listener/handler。
我的可运行class:
public class VideoFileUploadRunner implements Runnable {
private final VideoFileSync mVideoFileSync;
private final DataService dataService;
private Handler handler;
public VideoFileUploadRunner(VideoFileSync videoFileSync, DataService dataService) {
this.mVideoFileSync = videoFileSync;
this.dataService = dataService;
}
public int getPK()
{
return mVideoFileSync.get_idPrimaryKey();
}
public void setHandler(Handler handler) {
this.handler = handler;
}
@Override
public void run() {
try {
if (mVideoFileSync.get_idPrimaryKey() < 0) {
addEntryToDataBase();
}
updateStatus(VideoUploadStatus.IN_PROGRESS);
FileUploader uploader = new FileUploader();
updateStatus(uploader.uploadFile(mVideoFileSync.getVideoFile()));
} catch (Exception e) {
updateStatus(VideoUploadStatus.FAILED);
e.printStackTrace();
}
}
private void addEntryToDataBase() {
int pk = dataService.saveVideoRecordForSync(mVideoFileSync);
mVideoFileSync.set_idPrimaryKey(pk);
}
private void updateStatus(VideoUploadStatus status) {
if (handler != null) {
Message msg = new Message();
Bundle b = new Bundle();
b.putString(AppConstants.Sync_Status, status.toString());
msg.setData(b);
handler.sendMessage(msg);
}
dataService.updateUploadStatus(mVideoFileSync.get_idPrimaryKey(), status.toString());
}
}
在任务进度列表视图中:
public void setData(VideoFileSync fileSync) {
tvIso.setText(fileSync.getVideoFile().getISO_LOOP_EQUP());
tvUnit.setText(fileSync.getVideoFile().getUnit());
tvName.setText(fileSync.getVideoFile().getLocalPath());
tvStatus.setText(fileSync.getCurentStatus().toString());
addHandleForUpdate(fileSync);
}
private void addHandleForUpdate(VideoFileSync fileSync) {
Handler.Callback callBack = new Handler.Callback() {
@Override
public boolean handleMessage(Message msg) {
if(msg.getData()!=null)
{
tvStatus.setText(msg.getData().getString(AppConstants.Sync_Status));
}
return false;
}
};
mHadler = new Handler(Looper.getMainLooper(),callBack);
VideoFileUploadRunner runner = VideoUploadManager.getInstance().getRunnerForSyncFile(fileSync);
if(runner!=null)
runner.setHandler(mHadler);
}
在 VideoUploadManager 中,我有以下 return Runnable
对象的方法,在这里我需要帮助,以便我可以 return 当前正在执行的任务。
public synchronized VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
Iterator<Runnable> itr = mThreadPoolExecutor.getQueue().iterator();
while (itr.hasNext()) {
VideoFileUploadRunner runner = (VideoFileUploadRunner) itr.next();
if (runner.getPK() == fileSync.get_idPrimaryKey()) {
return runner;
}
}
return null;
}
最好的方法是公开一个同步变量,其中包含有关当前正在执行的任务的信息。
public MyTask implements Runnable {
private String id;
private Map<String, MyTask> mapTasks;
public MyTask(String id, Map<String, MyTask> mapTasks) {
this.id = id;
this.mapTasks = mapTasks;
}
public void run() {
synchronized(mapTasks) {
mapTasks.put(id, this);
}
...
synchronized(mapTasks) {
mapTasks.remove(id);
}
}
}
// Create a map of tasks
Map<String, MyTask> mapTasks = new HashMap<String, MyTask>();
// How to create tasks
MyTask myTask1 = new MyTask("task1", mapTasks);
MyTask myTask2 = new MyTask("task2", mapTasks);
executorService.execute(myTask1);
executorService.execute(myTask2);
....
并打印当前正在执行的任务列表:
public void printCurrentExecutingTasks(Map<String, MyTask> tasks) {
for (String id: tasks.keySet()) {
System.out.println("Executing task with id: " + id);
}
}
我的回答集中在问题上:"how to know which runnables are being executed".
这种方法保留一组并发的活动 Runnable:
private final Set<VideoFileUploadRunner> active = Collections.newSetFromMap(new ConcurrentHashMap<>());
提交给 ThreadPoolExecutor 的 Runnable 应该用更新此集合的 Runnable 装饰:
class DecoratedRunnable implements Runnable {
final VideoFileUploadRunner runnable;
public DecoratedRunnable(VideoFileUploadRunner runnable) {
this.runnable = runnable;
}
@Override
public void run() {
active.add(runnable); // add to set
try {
runnable.run();
} finally {
active.remove(runnable); // finally remove from set (even when something goes wrong)
}
}
}
所以我们可以在提交之前装饰 VideoFileUploadRunner
个实例:
executorService.submit(new DecoratedRunnable(videoFileUploadRunner));
方法 getRunnerForSyncFile
将像这样简单地实现:
public VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
return active.stream()
.filter(videoFileUploadRunner -> videoFileUploadRunner.getPK() == fileSync.get_idPrimaryKey())
.findAny()
.orElse(null);
}
Remark :正如@Charlie 评论的那样,这不是将侦听器附加到 Runnable 的最佳方式。您可以请求从 VideoFileUploadRunner
的 run()
方法内部设置消息处理程序,或者使用 MessageHandler 集初始化此类实例,或者使用这种装饰方法使其远离 VideoFileUploadRunner
class.
这个答案与我上面的评论有关。
与其尝试通过执行器找到 运行nable 并为其附加一个侦听器,不如在创建 运行nable 和 post 事件时将侦听器绑定到 运行nable从 运行nable 的执行代码到监听器。
只有当前活跃的 运行 启用才会 post 他们的更新。
这是一个例子。
为您的侦听器创建一个接口来实现。您的监听器可以是线程池执行器、私有内部 class 等
/**
* Callback interface to notify when a video upload's state changes
*/
interface IVideoUploadListener {
/**
* Called when a video upload's state changes
* @param pUploadId The ID of the video upload
* @param pStatus The new status of the upload
*/
void onStatusChanged(int pUploadId, VideoUploadStatus pStatus);
}
为您的状态类型创建枚举(例如)
/**
* Enum to hold different video upload states
*/
enum VideoUploadStatus {
IN_PROGRESS,
ADDED_TO_DB,
FILE_UPLOADED,
FINISHED,
FAILED
}
在每个 Runnable 中保存监听器的引用。
public class VideoFileUploadRunner implements Runnable {
private final IVideoUploadListener mUploadListener;
private final VideoFileSync mVideoFileSync;
private final DataService mDataService;
private Handler mHandler;
// etc...
}
通过构造函数传递一个接口实例
public VideoFileUploadRunner(IVideoUploadListener pUploadListener, VideoFileSync pVideoFileSync, DataService pDataService) {
mUploadListener = pUploadListener;
mVideoFileSync = pVideoFileSync;
mDataService = pDataService;
}
在 运行 方法中,post 根据需要更新侦听器。
@Override
public void run() {
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.IN_PROGRESS);
try {
if (mVideoFileSync.get_idPrimaryKey() < 0) {
addEntryToDataBase();
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.ADDED_TO_DB);
}
FileUploader uploader = new FileUploader();
uploader.uploadFile(mVideoFileSync.getVideoFile());
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FILE_UPLOADED);
// Other logic here...
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FINISHED);
}
catch (Exception e) {
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FAILED);
e.printStackTrace();
}
}
onStatusChanged() 方法的侦听器实现应该同步。这将有助于避免竞争条件导致的错误结果。
private IVideoUploadListener mUploadListener = new IVideoUploadListener() {
@Override
public synchronized void onStatusChanged(int pUploadId, VideoUploadStatus pStatus) {
Log.i("ListenerTag", "Video file with ID " + pUploadId + " has the status " + pStatus.toString());
}
};
我正在使用 ThreadPoolExecutor
在后台执行多个长 运行 任务,ThreadPoolExecutor
的池大小为 4,因此当添加超过 4 个任务时,它们将被推送到队列,当 4 个任务之一完成时,从队列中弹出一个任务以供执行。
我想知道有什么方法可以访问 Runnable
当前正在执行但不在队列中的对象,即前 4 个任务。
目标:我想这样做是为了在任何给定点获取任务的当前状态,在 mThreadPoolExecutor.getQueue()
的帮助下,我正在访问正在排队并准备执行的任务,请建议我访问任务的方法这些目前正在执行,以便我可以在需要时附加和删除 listener/handler。
我的可运行class:
public class VideoFileUploadRunner implements Runnable {
private final VideoFileSync mVideoFileSync;
private final DataService dataService;
private Handler handler;
public VideoFileUploadRunner(VideoFileSync videoFileSync, DataService dataService) {
this.mVideoFileSync = videoFileSync;
this.dataService = dataService;
}
public int getPK()
{
return mVideoFileSync.get_idPrimaryKey();
}
public void setHandler(Handler handler) {
this.handler = handler;
}
@Override
public void run() {
try {
if (mVideoFileSync.get_idPrimaryKey() < 0) {
addEntryToDataBase();
}
updateStatus(VideoUploadStatus.IN_PROGRESS);
FileUploader uploader = new FileUploader();
updateStatus(uploader.uploadFile(mVideoFileSync.getVideoFile()));
} catch (Exception e) {
updateStatus(VideoUploadStatus.FAILED);
e.printStackTrace();
}
}
private void addEntryToDataBase() {
int pk = dataService.saveVideoRecordForSync(mVideoFileSync);
mVideoFileSync.set_idPrimaryKey(pk);
}
private void updateStatus(VideoUploadStatus status) {
if (handler != null) {
Message msg = new Message();
Bundle b = new Bundle();
b.putString(AppConstants.Sync_Status, status.toString());
msg.setData(b);
handler.sendMessage(msg);
}
dataService.updateUploadStatus(mVideoFileSync.get_idPrimaryKey(), status.toString());
}
}
在任务进度列表视图中:
public void setData(VideoFileSync fileSync) {
tvIso.setText(fileSync.getVideoFile().getISO_LOOP_EQUP());
tvUnit.setText(fileSync.getVideoFile().getUnit());
tvName.setText(fileSync.getVideoFile().getLocalPath());
tvStatus.setText(fileSync.getCurentStatus().toString());
addHandleForUpdate(fileSync);
}
private void addHandleForUpdate(VideoFileSync fileSync) {
Handler.Callback callBack = new Handler.Callback() {
@Override
public boolean handleMessage(Message msg) {
if(msg.getData()!=null)
{
tvStatus.setText(msg.getData().getString(AppConstants.Sync_Status));
}
return false;
}
};
mHadler = new Handler(Looper.getMainLooper(),callBack);
VideoFileUploadRunner runner = VideoUploadManager.getInstance().getRunnerForSyncFile(fileSync);
if(runner!=null)
runner.setHandler(mHadler);
}
在 VideoUploadManager 中,我有以下 return Runnable
对象的方法,在这里我需要帮助,以便我可以 return 当前正在执行的任务。
public synchronized VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
Iterator<Runnable> itr = mThreadPoolExecutor.getQueue().iterator();
while (itr.hasNext()) {
VideoFileUploadRunner runner = (VideoFileUploadRunner) itr.next();
if (runner.getPK() == fileSync.get_idPrimaryKey()) {
return runner;
}
}
return null;
}
最好的方法是公开一个同步变量,其中包含有关当前正在执行的任务的信息。
public MyTask implements Runnable {
private String id;
private Map<String, MyTask> mapTasks;
public MyTask(String id, Map<String, MyTask> mapTasks) {
this.id = id;
this.mapTasks = mapTasks;
}
public void run() {
synchronized(mapTasks) {
mapTasks.put(id, this);
}
...
synchronized(mapTasks) {
mapTasks.remove(id);
}
}
}
// Create a map of tasks
Map<String, MyTask> mapTasks = new HashMap<String, MyTask>();
// How to create tasks
MyTask myTask1 = new MyTask("task1", mapTasks);
MyTask myTask2 = new MyTask("task2", mapTasks);
executorService.execute(myTask1);
executorService.execute(myTask2);
....
并打印当前正在执行的任务列表:
public void printCurrentExecutingTasks(Map<String, MyTask> tasks) {
for (String id: tasks.keySet()) {
System.out.println("Executing task with id: " + id);
}
}
我的回答集中在问题上:"how to know which runnables are being executed".
这种方法保留一组并发的活动 Runnable:
private final Set<VideoFileUploadRunner> active = Collections.newSetFromMap(new ConcurrentHashMap<>());
提交给 ThreadPoolExecutor 的 Runnable 应该用更新此集合的 Runnable 装饰:
class DecoratedRunnable implements Runnable {
final VideoFileUploadRunner runnable;
public DecoratedRunnable(VideoFileUploadRunner runnable) {
this.runnable = runnable;
}
@Override
public void run() {
active.add(runnable); // add to set
try {
runnable.run();
} finally {
active.remove(runnable); // finally remove from set (even when something goes wrong)
}
}
}
所以我们可以在提交之前装饰 VideoFileUploadRunner
个实例:
executorService.submit(new DecoratedRunnable(videoFileUploadRunner));
方法 getRunnerForSyncFile
将像这样简单地实现:
public VideoFileUploadRunner getRunnerForSyncFile(VideoFileSync fileSync) {
return active.stream()
.filter(videoFileUploadRunner -> videoFileUploadRunner.getPK() == fileSync.get_idPrimaryKey())
.findAny()
.orElse(null);
}
Remark :正如@Charlie 评论的那样,这不是将侦听器附加到 Runnable 的最佳方式。您可以请求从 VideoFileUploadRunner
的 run()
方法内部设置消息处理程序,或者使用 MessageHandler 集初始化此类实例,或者使用这种装饰方法使其远离 VideoFileUploadRunner
class.
这个答案与我上面的评论有关。
与其尝试通过执行器找到 运行nable 并为其附加一个侦听器,不如在创建 运行nable 和 post 事件时将侦听器绑定到 运行nable从 运行nable 的执行代码到监听器。
只有当前活跃的 运行 启用才会 post 他们的更新。
这是一个例子。
为您的侦听器创建一个接口来实现。您的监听器可以是线程池执行器、私有内部 class 等
/**
* Callback interface to notify when a video upload's state changes
*/
interface IVideoUploadListener {
/**
* Called when a video upload's state changes
* @param pUploadId The ID of the video upload
* @param pStatus The new status of the upload
*/
void onStatusChanged(int pUploadId, VideoUploadStatus pStatus);
}
为您的状态类型创建枚举(例如)
/**
* Enum to hold different video upload states
*/
enum VideoUploadStatus {
IN_PROGRESS,
ADDED_TO_DB,
FILE_UPLOADED,
FINISHED,
FAILED
}
在每个 Runnable 中保存监听器的引用。
public class VideoFileUploadRunner implements Runnable {
private final IVideoUploadListener mUploadListener;
private final VideoFileSync mVideoFileSync;
private final DataService mDataService;
private Handler mHandler;
// etc...
}
通过构造函数传递一个接口实例
public VideoFileUploadRunner(IVideoUploadListener pUploadListener, VideoFileSync pVideoFileSync, DataService pDataService) {
mUploadListener = pUploadListener;
mVideoFileSync = pVideoFileSync;
mDataService = pDataService;
}
在 运行 方法中,post 根据需要更新侦听器。
@Override
public void run() {
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.IN_PROGRESS);
try {
if (mVideoFileSync.get_idPrimaryKey() < 0) {
addEntryToDataBase();
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.ADDED_TO_DB);
}
FileUploader uploader = new FileUploader();
uploader.uploadFile(mVideoFileSync.getVideoFile());
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FILE_UPLOADED);
// Other logic here...
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FINISHED);
}
catch (Exception e) {
mUploadListener.onStatusChanged(getPrimaryKey(), VideoUploadStatus.FAILED);
e.printStackTrace();
}
}
onStatusChanged() 方法的侦听器实现应该同步。这将有助于避免竞争条件导致的错误结果。
private IVideoUploadListener mUploadListener = new IVideoUploadListener() {
@Override
public synchronized void onStatusChanged(int pUploadId, VideoUploadStatus pStatus) {
Log.i("ListenerTag", "Video file with ID " + pUploadId + " has the status " + pStatus.toString());
}
};