调用者线程如何等待 ScheduledExecutorService 下的任务定期完成作业
How the caller thread wait till task under ScheduledExecutorService finish the job periodicaly
我有一个要求,比如在进行 http 调用后必须更新每 45 分钟的值。
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class TokenManagerRunnable implements Runnable{
String token;
public String fetchToken() {
return this.token;
}
@Override
public void run() {
String result = "";
HttpPost post = new HttpPost("https://login.microsoftonline.com/{{TenentId}}/oauth2/token");
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("grant_type", "client_credentials"));
urlParameters.add(new BasicNameValuePair("client_id", "some client id"));
urlParameters.add(new BasicNameValuePair("client_secret", "some secret id"));
urlParameters.add(new BasicNameValuePair("resource", "https://database.windows.net"));
try {
post.setEntity(new UrlEncodedFormEntity(urlParameters));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try (CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post)){
result = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ObjectNode node;
try {
node = new ObjectMapper().readValue(result, ObjectNode.class);
System.out.println(result);
if (node.has("access_token")) {
result = node.get("access_token").toString();
}
System.out.println(result);
System.out.println(result.substring(1, result.length()-1));
//updating the token
this.token = result.substring(1, result.length()-1);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这是我的主要功能
SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
System.out.println("fetching the token ---- "+tokenManagerRunnable.fetchToken());
ds.setAccessToken(tokenManagerRunnable.fetchToken());
try {
Connection connection = ds.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(" select * from [dbo].[CLIENT]");
System.out.println("You have successfully logged in");
while(rs.next()) {
System.out.println(rs.getString(1));
}
}catch(Exception ex) {
ex.printStackTrace();
}
tokenManagerRunnable.fetchToken() 带来 null,因为 TokenManagerRunnable class 尚未执行。
我们如何实现等到 ScheduledExecutorService 完成任务,以便我们可以每隔 45 分钟从 tokenManagerRunnable.fetchToken() 而不是 null 获取值并在 Datasource 中设置新值?
有什么想法吗?
如您所知,这是一个同步问题。
您主要有两种同步线程的方法:
- 同步使用连接,
- 异步使用回调。
从重复任务的异步性质来看,我认为最好的选择是使用回调。
这样您就可以在每次检索时设置新的令牌值。
我在下面更新了您的代码:
// Imports [...]
public class TokenManagerRunnable implements Runnable {
private final SQLServerDataSource ds;
/**
* New constructor taking a datasource to trigger the persistence of the token
* on retrieval.
* @param ds
*/
public TokenManagerRunnable(SQLServerDataSource ds) {
this.ds = ds;
}
/**
* New method persisting the token on datasource when retrieved.
* @param token
*/
private void setToken(String token) {
System.out.println("fetching the token ---- " + token);
this.ds.setAccessToken(token);
}
@Override
public void run() {
// Retrieval [...]
try {
// Parsing [...]
//updating the token
this.setToken(result.substring(1, result.length() - 1));
} catch (IOException e) {
e.printStackTrace();
}
}
}
如您所见,您不需要 Runner 的任何状态,因为它会直接将结果流式传输到数据源。
您只需将此数据源提供给 运行 施工人员。
SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);
// Run 1 time synchronously, at the end of this run call
// the token will be set
tokenManagerRunnable.run();
// Schedule a token refresh every 45 minutes starting now
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
// Use the token [...]
编辑
正如您在评论中所述,您需要执行 Runnable
才能联系您的数据库。
您需要同步执行此操作或将下面的代码添加到单独的线程中,具体取决于您打算对应用程序执行的操作。下面的问题是 您的应用程序是否依赖于此初始化?.
事实上,您可以调用 run()
方法而无需将其放入线程中,这将简单地 运行 同步更新您的令牌。
这意味着您需要在计划线程执行中开始自动刷新之前同步调用tokenManagerRunnable.run();
。
如果我答对了你的问题,你可以使用 CompletableFuture
。您将 token
包装在 CompletableFuture
中,调度线程完成它。由于 CompletableFuture
是 Future
,其他线程可以等待结果。
下面是说明该机制的基本实现。
import java.util.concurrent.CompletableFuture;
class Main {
static CompletableFuture<String> token = new CompletableFuture<>();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100_000_000; i++) {
Math.log(Math.sqrt(i));
if (i % 10_000_000 == 0) {
System.out.println("doing stuff");
}
}
token.complete("result");
}).start();
String result = token.join(); // wait until token is set and get it
System.out.println("got " + result);
}
}
请记住,您必须在获得结果后分配 token
一个新的 CompletableFuture
。那是因为它们只能完成一次。
我能够使用以下代码实现
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import java.util.ArrayList;
import java.util.List;
public class AzureServicePrincipleTokenManager implements Runnable {
SQLServerDataSource ds ;
String secret;
String clientId;
String tenetId;
static final String RESOURCE = "https://database.windows.net";
static final String ENDPOINT = "https://login.microsoftonline.com/";
static final String GRANT_TYPE = "client_credentials";
boolean confirmTokenFlag=false;
private static Log logger = LogFactory.getLog( "AzureServicePrincipleTokenManager" );
public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
this.ds = ds;
this.secret = secret;
this.clientId = clientId;
this.tenetId = tenetId;
}
public boolean getConfirmTokenFlag(){
return this.confirmTokenFlag;
}
private void setAccessToken(String token){
this.ds.setAccessToken(token);
}
@Override
public void run() {
logger.info("Fetching Service Principle accessToken");
String result = "";
HttpPost post = new HttpPost(ENDPOINT+this.tenetId+"/oauth2/token");
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("grant_type", GRANT_TYPE));
urlParameters.add(new BasicNameValuePair("client_id", this.clientId));
urlParameters.add(new BasicNameValuePair("client_secret", this.secret));
urlParameters.add(new BasicNameValuePair("resource", RESOURCE));
try{
post.setEntity(new UrlEncodedFormEntity(urlParameters));
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post);
result = EntityUtils.toString(response.getEntity());
ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);
if (node.has("access_token")) {
result = node.get("access_token").toString();
}
}catch (Exception ex){
logger.error(ex.getMessage(),ex);
}
this.setAccessToken(result.substring(1, result.length()-1));
confirmTokenFlag=true;
logger.info("set confirmTokenFlag true");
}
}
来电者会是这样
SQLServerDataSource ds = new SQLServerDataSource();
AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds,"your tenentID","your clientID","your secret");
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
logger.info("----ExecuterService started the Runnable task");
while (azureServicePrincipleTokenManager.getConfirmTokenFlag()!=true){
ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i'm calling ds.getAccessToken();
}
logger.info("----get the token after settingup "+ds.getAccessToken());
我有一个要求,比如在进行 http 调用后必须更新每 45 分钟的值。
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class TokenManagerRunnable implements Runnable{
String token;
public String fetchToken() {
return this.token;
}
@Override
public void run() {
String result = "";
HttpPost post = new HttpPost("https://login.microsoftonline.com/{{TenentId}}/oauth2/token");
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("grant_type", "client_credentials"));
urlParameters.add(new BasicNameValuePair("client_id", "some client id"));
urlParameters.add(new BasicNameValuePair("client_secret", "some secret id"));
urlParameters.add(new BasicNameValuePair("resource", "https://database.windows.net"));
try {
post.setEntity(new UrlEncodedFormEntity(urlParameters));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try (CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post)){
result = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ObjectNode node;
try {
node = new ObjectMapper().readValue(result, ObjectNode.class);
System.out.println(result);
if (node.has("access_token")) {
result = node.get("access_token").toString();
}
System.out.println(result);
System.out.println(result.substring(1, result.length()-1));
//updating the token
this.token = result.substring(1, result.length()-1);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这是我的主要功能
SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
System.out.println("fetching the token ---- "+tokenManagerRunnable.fetchToken());
ds.setAccessToken(tokenManagerRunnable.fetchToken());
try {
Connection connection = ds.getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(" select * from [dbo].[CLIENT]");
System.out.println("You have successfully logged in");
while(rs.next()) {
System.out.println(rs.getString(1));
}
}catch(Exception ex) {
ex.printStackTrace();
}
tokenManagerRunnable.fetchToken() 带来 null,因为 TokenManagerRunnable class 尚未执行。
我们如何实现等到 ScheduledExecutorService 完成任务,以便我们可以每隔 45 分钟从 tokenManagerRunnable.fetchToken() 而不是 null 获取值并在 Datasource 中设置新值?
有什么想法吗?
如您所知,这是一个同步问题。 您主要有两种同步线程的方法:
- 同步使用连接,
- 异步使用回调。
从重复任务的异步性质来看,我认为最好的选择是使用回调。 这样您就可以在每次检索时设置新的令牌值。
我在下面更新了您的代码:
// Imports [...]
public class TokenManagerRunnable implements Runnable {
private final SQLServerDataSource ds;
/**
* New constructor taking a datasource to trigger the persistence of the token
* on retrieval.
* @param ds
*/
public TokenManagerRunnable(SQLServerDataSource ds) {
this.ds = ds;
}
/**
* New method persisting the token on datasource when retrieved.
* @param token
*/
private void setToken(String token) {
System.out.println("fetching the token ---- " + token);
this.ds.setAccessToken(token);
}
@Override
public void run() {
// Retrieval [...]
try {
// Parsing [...]
//updating the token
this.setToken(result.substring(1, result.length() - 1));
} catch (IOException e) {
e.printStackTrace();
}
}
}
如您所见,您不需要 Runner 的任何状态,因为它会直接将结果流式传输到数据源。 您只需将此数据源提供给 运行 施工人员。
SQLServerDataSource ds = new SQLServerDataSource();
TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);
// Run 1 time synchronously, at the end of this run call
// the token will be set
tokenManagerRunnable.run();
// Schedule a token refresh every 45 minutes starting now
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
// Use the token [...]
编辑
正如您在评论中所述,您需要执行 Runnable
才能联系您的数据库。
您需要同步执行此操作或将下面的代码添加到单独的线程中,具体取决于您打算对应用程序执行的操作。下面的问题是 您的应用程序是否依赖于此初始化?.
事实上,您可以调用 run()
方法而无需将其放入线程中,这将简单地 运行 同步更新您的令牌。
这意味着您需要在计划线程执行中开始自动刷新之前同步调用tokenManagerRunnable.run();
。
如果我答对了你的问题,你可以使用 CompletableFuture
。您将 token
包装在 CompletableFuture
中,调度线程完成它。由于 CompletableFuture
是 Future
,其他线程可以等待结果。
下面是说明该机制的基本实现。
import java.util.concurrent.CompletableFuture;
class Main {
static CompletableFuture<String> token = new CompletableFuture<>();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100_000_000; i++) {
Math.log(Math.sqrt(i));
if (i % 10_000_000 == 0) {
System.out.println("doing stuff");
}
}
token.complete("result");
}).start();
String result = token.join(); // wait until token is set and get it
System.out.println("got " + result);
}
}
请记住,您必须在获得结果后分配 token
一个新的 CompletableFuture
。那是因为它们只能完成一次。
我能够使用以下代码实现
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import java.util.ArrayList;
import java.util.List;
public class AzureServicePrincipleTokenManager implements Runnable {
SQLServerDataSource ds ;
String secret;
String clientId;
String tenetId;
static final String RESOURCE = "https://database.windows.net";
static final String ENDPOINT = "https://login.microsoftonline.com/";
static final String GRANT_TYPE = "client_credentials";
boolean confirmTokenFlag=false;
private static Log logger = LogFactory.getLog( "AzureServicePrincipleTokenManager" );
public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
this.ds = ds;
this.secret = secret;
this.clientId = clientId;
this.tenetId = tenetId;
}
public boolean getConfirmTokenFlag(){
return this.confirmTokenFlag;
}
private void setAccessToken(String token){
this.ds.setAccessToken(token);
}
@Override
public void run() {
logger.info("Fetching Service Principle accessToken");
String result = "";
HttpPost post = new HttpPost(ENDPOINT+this.tenetId+"/oauth2/token");
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("grant_type", GRANT_TYPE));
urlParameters.add(new BasicNameValuePair("client_id", this.clientId));
urlParameters.add(new BasicNameValuePair("client_secret", this.secret));
urlParameters.add(new BasicNameValuePair("resource", RESOURCE));
try{
post.setEntity(new UrlEncodedFormEntity(urlParameters));
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(post);
result = EntityUtils.toString(response.getEntity());
ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);
if (node.has("access_token")) {
result = node.get("access_token").toString();
}
}catch (Exception ex){
logger.error(ex.getMessage(),ex);
}
this.setAccessToken(result.substring(1, result.length()-1));
confirmTokenFlag=true;
logger.info("set confirmTokenFlag true");
}
}
来电者会是这样
SQLServerDataSource ds = new SQLServerDataSource();
AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds,"your tenentID","your clientID","your secret");
ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
logger.info("----ExecuterService started the Runnable task");
while (azureServicePrincipleTokenManager.getConfirmTokenFlag()!=true){
ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i'm calling ds.getAccessToken();
}
logger.info("----get the token after settingup "+ds.getAccessToken());