如何使用java可调用接口异步处理队列项?

How to use java callable interface to asynchronously process queue items?

我曾经实现 Runnable 接口到 peek() 队列中的项目并将其发送到 API。

但现在我需要使用 Callable 接口到 peek() 队列并将项目发送到 API。如果 return 200,则从队列中删除该项目。

这是我用来实现此功能的代码。我该如何修改代码?有关于此的任何示例或参考吗?谢谢。

public class QueueProcessor implements Runnable{

private static ObjectQueue<JSONObject> objectQueue;

static {
 objectQueue = new ObjectQueue<JSONObject>();
}

public void run() {

//add items to the queue
   objectQueue.add(jsonObeject)
    Random r = new Random();
    try {
        while (true) {
            try {
                if (!objectQueue.isEmpty()) {
                    JSONObject o = objectQueue.remove();
                    sendRequest(o.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
        Thread.currentThread().interrupt();
    } 
}

 public void sendRequest(JSONObject json) {

 Client client = ClientBuilder.newClient();
 WebTarget baseTarget = client.target("someUrl");
 Invocation.Builder builder = baseTarget.request();
 Response response = builder.post(Entity.entity(json.toString(), MediaType.APPLICATION_JSON));

 int code = response.getStatus();   
 if (200 == code) {
    objectQueue.remove();
 }  

}

为了让您入门,请参阅 this other SO question

并注意问题本身的第 1 点。

要实现异步调用,您首先需要解耦 - 任务提交/执行(从队列中选择项目并进行 API 调用)和 API 调用后的响应处理(从队列中删除项目,如果响应状态为 200) 。这种解耦可以通过 - ExecutorService

来实现

因此,首先将 ExecutorService 引入您的 Runnable 代码,即从某个控制器 class 开始执行您的 Runnable(class 使用 main 方法),它使用Executor 到 submit/execute 请求。您还没有显示如何触发您的线程,所以您可能已经在这样做了。

现在将您的 Runnable 更改为 Callable<Response> 即创建一个类似于您的 Runnable 的 Callable 并实现 Callable<Response> 并在 call() 方法中,使您的 API 称呼。您确实需要与主控制器 class 和此 Callable 共享您的 ObjectQueue<JSONObject>,以便队列实现需要线程安全,或者您需要使 call() 方法线程安全。

我的意思是,您要么在控制器中绕过队列并继续提交对每个项目的请求,要么将整个队列传递给 Callble 并在那里完成循环 - 由您决定。

到目前为止要注意的是 call() 方法 return 是一个值 - CallableRunnablerun() 方法不是 return任何值,这是两者之间的主要区别。

现在回到控制器 class - 提交或执行方法会将您的 Response 包装成 Future submit

现在在 Future 上结合使用 isDone()get() 方法从队列中删除项目。

请记住,您应该能够从 API 响应中识别队列中的已处理对象 - 如果不是,那么您需要将 API 响应与提交的 JSONObject 组合起来并将其包装在Future 找出要删除的对象。只有状态是不够的,如果队列被限制为仅删除顶部元素,您可能需要另一个数据结构来保存对象。如果您只是将 runnable 替换为 callable 但不希望使您的程序真正异步,则不会出现这种复杂情况。

这些只是广泛的指导方针,提供现成的代码是我不会做的。如果您的基础知识正确,您会在 Internet 上找到很多示例。另外,请在粘贴代码时练习包含 import 语句。

几个链接

How to send parallel GET requests and wait for result responses?

How to send multiple asynchronous requests to different web services?