调用者线程如何等待 ScheduledExecutorService 下的任务定期完成作业

How the caller thread wait till task under ScheduledExecutorService finish the job periodicaly

我有一个要求,比如在进行 http 调用后必须更新每 45 分钟的值。

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class TokenManagerRunnable implements Runnable{
    
    String token;

    
    public String fetchToken() {
        return this.token;
    }
    

    @Override
    public void run() {
        
        String result = "";
        
        HttpPost post = new HttpPost("https://login.microsoftonline.com/{{TenentId}}/oauth2/token");
        List<NameValuePair> urlParameters = new ArrayList<>();
        urlParameters.add(new BasicNameValuePair("grant_type", "client_credentials"));
        urlParameters.add(new BasicNameValuePair("client_id", "some client id"));
        urlParameters.add(new BasicNameValuePair("client_secret", "some secret id"));
        urlParameters.add(new BasicNameValuePair("resource", "https://database.windows.net"));

        try {
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        try (CloseableHttpClient httpClient = HttpClients.createDefault();
             CloseableHttpResponse response = httpClient.execute(post)){

            result = EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        
        ObjectNode node;
        try {
            node = new ObjectMapper().readValue(result, ObjectNode.class);
        
        
        System.out.println(result);
        
        if (node.has("access_token")) {
            result = node.get("access_token").toString();           
        }
        System.out.println(result);
        System.out.println(result.substring(1, result.length()-1));
        
        
        //updating the token
        this.token = result.substring(1, result.length()-1);
        
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

这是我的主要功能

        SQLServerDataSource ds = new SQLServerDataSource();
        TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
        ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
        sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
        System.out.println("fetching the token ---- "+tokenManagerRunnable.fetchToken());
        ds.setAccessToken(tokenManagerRunnable.fetchToken());
       try {
        
        Connection connection = ds.getConnection(); 
        Statement stmt = connection.createStatement();
        ResultSet rs = stmt.executeQuery(" select * from [dbo].[CLIENT]"); 
        System.out.println("You have successfully logged in");
           
        while(rs.next()) {
            System.out.println(rs.getString(1));
        }
        
    }catch(Exception ex) {
        ex.printStackTrace();
    }

tokenManagerRunnable.fetchToken() 带来 null,因为 TokenManagerRunnable class 尚未执行。

我们如何实现等到 ScheduledExecutorService 完成任务,以便我们可以每隔 45 分钟从 tokenManagerRunnable.fetchToken() 而不是 null 获取值并在 Datasource 中设置新值?

有什么想法吗?

如您所知,这是一个同步问题。 您主要有两种同步线程的方法:

  • 同步使用连接,
  • 异步使用回调。

从重复任务的异步性质来看,我认为最好的选择是使用回调。 这样您就可以在每次检索时设置新的令牌值。

我在下面更新了您的代码:

// Imports [...]

public class TokenManagerRunnable implements Runnable {
    private final SQLServerDataSource ds;

    /**
     * New constructor taking a datasource to trigger the persistence of the token
     * on retrieval.
     * @param ds
     */
    public TokenManagerRunnable(SQLServerDataSource ds) {
        this.ds = ds;
    }

    /**
     * New method persisting the token on datasource when retrieved.
     * @param token
     */
    private void setToken(String token) {
        System.out.println("fetching the token ---- " + token);
        this.ds.setAccessToken(token);
    }

    @Override
    public void run() {
        // Retrieval [...]
        try {
            // Parsing [...]
            //updating the token
            this.setToken(result.substring(1, result.length() - 1));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

如您所见,您不需要 Runner 的任何状态,因为它会直接将结果流式传输到数据源。 您只需将此数据源提供给 运行 施工人员。

SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);

// Run 1 time synchronously, at the end of this run call
// the token will be set
tokenManagerRunnable.run();
        
// Schedule a token refresh every 45 minutes starting now
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);

// Use the token [...]

编辑

正如您在评论中所述,您需要执行 Runnable 才能联系您的数据库。 您需要同步执行此操作或将下面的代码添加到单独的线程中,具体取决于您打算对应用程序执行的操作。下面的问题是 您的应用程序是否依赖于此初始化?.

事实上,您可以调用 run() 方法而无需将其放入线程中,这将简单地 运行 同步更新您的令牌。 这意味着您需要在计划线程执行中开始自动刷新之前同步调用tokenManagerRunnable.run();

如果我答对了你的问题,你可以使用 CompletableFuture。您将 token 包装在 CompletableFuture 中,调度线程完成它。由于 CompletableFutureFuture,其他线程可以等待结果。

下面是说明该机制的基本实现。

import java.util.concurrent.CompletableFuture;

class Main {
    
    static CompletableFuture<String> token = new CompletableFuture<>();
    
    
    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100_000_000; i++) {
                Math.log(Math.sqrt(i));
                if (i % 10_000_000 == 0) {
                    System.out.println("doing stuff");
                }
            }
            token.complete("result");
        }).start();
        
        String result = token.join(); // wait until token is set and get it
        System.out.println("got " + result);
    }
}

请记住,您必须在获得结果后分配 token 一个新的 CompletableFuture。那是因为它们只能完成一次。

我能够使用以下代码实现

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
 
import java.util.ArrayList;
import java.util.List;
 
public class AzureServicePrincipleTokenManager implements Runnable {
 
 
    SQLServerDataSource ds ;
    String secret;
    String clientId;
    String tenetId;
    static final String RESOURCE = "https://database.windows.net";
    static final String ENDPOINT = "https://login.microsoftonline.com/";
    static final String GRANT_TYPE = "client_credentials";
    boolean confirmTokenFlag=false;
    private static Log logger = LogFactory.getLog( "AzureServicePrincipleTokenManager" );
 
    public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
        this.ds = ds;
        this.secret = secret;
        this.clientId = clientId;
        this.tenetId = tenetId;
    }
 
    public boolean getConfirmTokenFlag(){
        return this.confirmTokenFlag;
    }
 
    private void setAccessToken(String token){
        this.ds.setAccessToken(token);
    }
 
 
 
 
    @Override
    public void run() {
        logger.info("Fetching Service Principle accessToken");
        String result = "";
        HttpPost post = new HttpPost(ENDPOINT+this.tenetId+"/oauth2/token");
        List<NameValuePair> urlParameters = new ArrayList<>();
        urlParameters.add(new BasicNameValuePair("grant_type", GRANT_TYPE));
        urlParameters.add(new BasicNameValuePair("client_id", this.clientId));
        urlParameters.add(new BasicNameValuePair("client_secret", this.secret));
        urlParameters.add(new BasicNameValuePair("resource", RESOURCE));
 
        try{
 
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
            CloseableHttpClient httpClient = HttpClients.createDefault();
            CloseableHttpResponse response = httpClient.execute(post);
            result = EntityUtils.toString(response.getEntity());
            ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);
 
            if (node.has("access_token")) {
                result = node.get("access_token").toString();
            }
 
        }catch (Exception ex){
            logger.error(ex.getMessage(),ex);
        }
 
        this.setAccessToken(result.substring(1, result.length()-1));
        confirmTokenFlag=true;
        logger.info("set confirmTokenFlag true");
 
 
    }
} 

来电者会是这样

        SQLServerDataSource ds = new SQLServerDataSource();

        AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds,"your tenentID","your clientID","your secret");
        ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
        sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
        logger.info("----ExecuterService started the Runnable task");

        while (azureServicePrincipleTokenManager.getConfirmTokenFlag()!=true){

            ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i'm calling ds.getAccessToken();
        }
            logger.info("----get the token after settingup  "+ds.getAccessToken());