如何跟踪多个服务器中异步任务 运行 的进度状态
How to track progress status of async tasks running in multiple servers
我在 spring 中有多个异步任务 运行 boot.These 任务读取 excel 文件并将所有数据插入数据库。
当从前端发出请求时,任务开始。然后前端会周期性的不断请求任务的进度。
我需要跟踪每个任务的进度并知道它们何时完成。
这是接收任务请求和轮询其进度状态的控制器文件:
public class TaskController {
@RequestMapping(method = RequestMethod.POST, value = "/uploadExcel")
public ResponseEntity<?> uploadExcel(String excelFilePath) {
String taskId = UUID.randomUUID().toString();
taskAsyncService.AsyncManager(id, excelFilePath);
HashMap<String, String> responseMap = new HashMap<>();
responeMap.put("taskId",taskId);
return new ResponseEntity<>(responseMap, HttpStatus.ACCEPTED);
}
// This will be polled to get progress of tasks being executed
@RequestMapping(method = RequestMethod.GET, value = "/tasks/progress/{id}")
public ResponseEntity<?> getTaskProgress(@PathVariable String taskId) {
HashMap<String, String> map = new HashMap<>();
if (taskAsyncService.containsTaskEntry(id) == null) {
map.put("Error", "TaskId does not exist");
return new ResponseEntity<>(map, HttpStatus.BAD_REQUEST);
}
boolean taskProgress = taskAsyncService.getTaskProgress(taskId);
if (taskProgress) {
map.put("message", "Task complete");
taskAsyncService.removeTaskProgressEntry(taskId);
return new ResponseEntity<>(map, HttpStatus.OK);
}
//Otherwise task is still running
map.put("progressStatus", "Task running");
return new ResponseEntity<>(map, HttpStatus.PARTIAL_CONTENT);
}
}
这是执行异步任务的代码。
public class TaskAsyncService {
private final AtomicReference<ConcurrentHashMap<String, Boolean>> isTaskCompleteMap = new AtomicReference<ConcurrentHashMap<String, Boolean>>();
protected boolean containsTaskEntry(String taskId) {
if (isTaskCompleteMap.get().get(taskId) != null) {
return true;
}
return false;
}
protected boolean getTaskProgress(String taskId, String excelFilePath) {
return isTaskCompleteMap.get().get(taskId);
}
protected void removeTaskProgressEntry(String taskId) {
if (isTaskCompleteMap.get() != null) {
isTaskCompleteMap.get().remove(taskId);
}
}
@Async
public CompletableFuture<?> AsyncManager(String taskId) {
HashMap<String, String> map = new HashMap<>();
//Add a new entry into isTaskCompleteMap
isTaskCompleteMap.get().put(taskId, false);
//Insert excel rows into database
//Task completed set value to true
isTaskCompleteMap.get().put(taskId, true);
map.put("Success", "Task completed");
return CompletableFuture.completedFuture(map);
}
}
I am using AWS EC2 with a load balancer. Therefore, sometimes a
polling request gets handled by a newly spawned server which cannot
access the isTaskCompleteMap and returns saying that "TaskId does not exist".
在这种情况下如何跟踪任务的状态?我知道我需要一个分布式数据结构,但不了解它的种类和实现方式。
您可以使用 Hazelcast 或类似的分布式解决方案(Redis 等)。
地图 - https://docs.hazelcast.org/docs/3.0/manual/html/ch02.html#Map
- 使用来自 hazelcast 而不是 CHM 的分布式地图。
- 从这样的映射中获取应该 return 任务,即使它们正在另一个 pod(服务器)上处理
我在 spring 中有多个异步任务 运行 boot.These 任务读取 excel 文件并将所有数据插入数据库。
当从前端发出请求时,任务开始。然后前端会周期性的不断请求任务的进度。
我需要跟踪每个任务的进度并知道它们何时完成。
这是接收任务请求和轮询其进度状态的控制器文件:
public class TaskController {
@RequestMapping(method = RequestMethod.POST, value = "/uploadExcel")
public ResponseEntity<?> uploadExcel(String excelFilePath) {
String taskId = UUID.randomUUID().toString();
taskAsyncService.AsyncManager(id, excelFilePath);
HashMap<String, String> responseMap = new HashMap<>();
responeMap.put("taskId",taskId);
return new ResponseEntity<>(responseMap, HttpStatus.ACCEPTED);
}
// This will be polled to get progress of tasks being executed
@RequestMapping(method = RequestMethod.GET, value = "/tasks/progress/{id}")
public ResponseEntity<?> getTaskProgress(@PathVariable String taskId) {
HashMap<String, String> map = new HashMap<>();
if (taskAsyncService.containsTaskEntry(id) == null) {
map.put("Error", "TaskId does not exist");
return new ResponseEntity<>(map, HttpStatus.BAD_REQUEST);
}
boolean taskProgress = taskAsyncService.getTaskProgress(taskId);
if (taskProgress) {
map.put("message", "Task complete");
taskAsyncService.removeTaskProgressEntry(taskId);
return new ResponseEntity<>(map, HttpStatus.OK);
}
//Otherwise task is still running
map.put("progressStatus", "Task running");
return new ResponseEntity<>(map, HttpStatus.PARTIAL_CONTENT);
}
}
这是执行异步任务的代码。
public class TaskAsyncService {
private final AtomicReference<ConcurrentHashMap<String, Boolean>> isTaskCompleteMap = new AtomicReference<ConcurrentHashMap<String, Boolean>>();
protected boolean containsTaskEntry(String taskId) {
if (isTaskCompleteMap.get().get(taskId) != null) {
return true;
}
return false;
}
protected boolean getTaskProgress(String taskId, String excelFilePath) {
return isTaskCompleteMap.get().get(taskId);
}
protected void removeTaskProgressEntry(String taskId) {
if (isTaskCompleteMap.get() != null) {
isTaskCompleteMap.get().remove(taskId);
}
}
@Async
public CompletableFuture<?> AsyncManager(String taskId) {
HashMap<String, String> map = new HashMap<>();
//Add a new entry into isTaskCompleteMap
isTaskCompleteMap.get().put(taskId, false);
//Insert excel rows into database
//Task completed set value to true
isTaskCompleteMap.get().put(taskId, true);
map.put("Success", "Task completed");
return CompletableFuture.completedFuture(map);
}
}
I am using AWS EC2 with a load balancer. Therefore, sometimes a polling request gets handled by a newly spawned server which cannot access the isTaskCompleteMap and returns saying that "TaskId does not exist".
在这种情况下如何跟踪任务的状态?我知道我需要一个分布式数据结构,但不了解它的种类和实现方式。
您可以使用 Hazelcast 或类似的分布式解决方案(Redis 等)。
地图 - https://docs.hazelcast.org/docs/3.0/manual/html/ch02.html#Map
- 使用来自 hazelcast 而不是 CHM 的分布式地图。
- 从这样的映射中获取应该 return 任务,即使它们正在另一个 pod(服务器)上处理