在多个 java 线程之间共享数据并获取更新值
Sharing data between multiple java threads and get the updated value
我想创建一个 java 应用程序,我们希望在访问令牌的帮助下为多个用户进行休息调用。我每个用户使用 1 个线程。我正在使用的访问令牌有效期为 1 hour.Once 令牌过期,我将收到 401 错误,并且必须更新所有线程的令牌,然后继续。我正在考虑使用我已设为静态的 volatile 变量来更新所有线程。我的要求是,当我在其中一个线程中知道令牌已过期时,我希望所有线程停止处理并等待新令牌生成(这需要几秒钟)。此外,一旦生成,令牌应自动更新,每个线程不会因令牌过期而失败。
下面是我写的示例代码:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Sample {
public static void main(String[] args) {
String[] myStrings = { "User1" , "User2" , "User3" };
ScheduledExecutorService scheduledExecutorService = Executors
.newScheduledThreadPool(myStrings.length);
TokenGenerator.getToken();
for(String str : myStrings){
scheduledExecutorService.scheduleAtFixedRate(new Task(str), 0, 5, TimeUnit.SECONDS);
}
}
}
class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run() {
getResponse(TokenGenerator.token);
}
private void getResponse(String token) {
// Make http calls
// if token expire , call getToken again. Pause all the running threads , and
// update the token for all threads
TokenGenerator.getToken();
}
}
class TokenGenerator {
public static volatile String token;
public static void getToken() {
token = "new Token everytime";
}
}
有没有更好的方法来解决这个问题?上面的代码不满足我的用例,因为一旦线程开始生成新令牌,所有其他线程都不会被暂停。请求提出一些改进建议..
您可以使用以下模式,仅使用其 getter 访问令牌并在收到错误响应时调用 loadToken
。
class TokenGenerator {
private String token = null;
public synchronized String getToken() {
if (token == null) {
loadToken();
}
return token;
}
public synchronized void loadToken() {
token = "load here";
}
}
要解决您暂停线程的问题,您可以在任何您想暂停的地方调用 getToken()
Thread
,如果加载令牌当前处于活动状态,它将自动阻止。
class Task implements Runnable{
private String name;
private TokenGenerator tokenGenerator;
public Task(String name, TokenGenerator tokenGenerator) {
this.name = name;
this.tokenGenerator = tokenGenerator;
}
@Override
public void run() {
getResponse(tokenGenerator.getToken());
}
private void getResponse(String token) {
// Make http calls
// if token expire , call getToken again. Pause all the running threads , and
// update the token for all threads
tokenGenerator.loadToken();
}
}
您可以将令牌放在 AtomicReference and use a Semaphore 中以暂停线程:
public class TokenWrapper {
private final AtomicReference<Token> tokenRef = new AtomicReference<>(null);
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
public TokenWrapper() {
Token newToken = // refresh token
tokenRef.set(newToken);
}
public Token getToken() {
Token token = null;
while((token = tokenRef.get()) == null) {
semaphore.acquire();
}
return token;
}
public Token refreshToken(Token oldToken) {
if(tokenRef.compareAndSet(oldToken, null)) {
semaphore.drainPermits();
Token newToken = // refresh token
tokenRef.set(newToken);
semaphore.release(Integer.MAX_VALUE);
return newToken;
} else return getToken();
}
}
public class RESTService {
private static final TokenWrapper tokenWrapper = new TokenWrapper();
public void run() {
Token token = tokenWrapper.getToken();
Response response = // call service with token
if(response.getStatus == 401) {
tokenWrapper.refreshToken(token);
}
}
}
refreshToken()
在 tokenRef
上使用原子 compareAndSet
来确保只有一个线程会刷新令牌,然后在 semaphore
上调用 drainPermits()
使其他线程等待直到令牌被刷新。 getToken()
returns 令牌,如果它不是 null
,否则等待 semaphore
- 这是在循环中完成的,因为线程可能必须自旋在 tokenRef
被设置为 null
和 drainPermits()
被调用 semaphore
之间的几个周期。
编辑:修改了 refreshToken(Token oldToken)
的签名,以便传入旧令牌而不是在方法内部读取 - 这是为了防止 RESTService_A 刷新令牌的情况,RESTService_B 使用旧的过期令牌获得 401,然后 RESTService_B 在 RESTService_A 对 refreshToken
的调用完成后调用 refreshToken
,导致令牌被刷新两次。使用新签名,RESTService_B 将传入旧的过期令牌,因此当旧令牌无法匹配新令牌时,compareAndSet
调用将失败,导致 refreshToken
仅被调用一次。
由于您需要做两件事(http 调用和更新令牌),您可以尝试两种检查方式。
一个检查令牌是否 is expired or not
,另一个检查是否有其他线程正在尝试 update the token
。
这里用一个小代码来演示这个想法(它的操作方式有点脏,所以可能需要一些清理)
...
private string token
private volatile static isTokenExpired=false //checking if the token is expired or not
private volatile static waitingForTokenRefresher=false; //checking if we should wait for update.
@Override
public void run(){
while(tokenisExpired){
//wait
}
//http calls find out if token is good to go
//check if no one else uses the token:
if( token is actually expired){
if(!waitingForTokenRefresher){
isTokenExpired=true;
waitingForTokenRefresher=true;
//refresh token
waitingForTokenRefresher=false
isTokenExpired=false;
}
}
while(!waitingForTokenRefresher){
//wait...
}
}
我想创建一个 java 应用程序,我们希望在访问令牌的帮助下为多个用户进行休息调用。我每个用户使用 1 个线程。我正在使用的访问令牌有效期为 1 hour.Once 令牌过期,我将收到 401 错误,并且必须更新所有线程的令牌,然后继续。我正在考虑使用我已设为静态的 volatile 变量来更新所有线程。我的要求是,当我在其中一个线程中知道令牌已过期时,我希望所有线程停止处理并等待新令牌生成(这需要几秒钟)。此外,一旦生成,令牌应自动更新,每个线程不会因令牌过期而失败。
下面是我写的示例代码:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Sample {
public static void main(String[] args) {
String[] myStrings = { "User1" , "User2" , "User3" };
ScheduledExecutorService scheduledExecutorService = Executors
.newScheduledThreadPool(myStrings.length);
TokenGenerator.getToken();
for(String str : myStrings){
scheduledExecutorService.scheduleAtFixedRate(new Task(str), 0, 5, TimeUnit.SECONDS);
}
}
}
class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run() {
getResponse(TokenGenerator.token);
}
private void getResponse(String token) {
// Make http calls
// if token expire , call getToken again. Pause all the running threads , and
// update the token for all threads
TokenGenerator.getToken();
}
}
class TokenGenerator {
public static volatile String token;
public static void getToken() {
token = "new Token everytime";
}
}
有没有更好的方法来解决这个问题?上面的代码不满足我的用例,因为一旦线程开始生成新令牌,所有其他线程都不会被暂停。请求提出一些改进建议..
您可以使用以下模式,仅使用其 getter 访问令牌并在收到错误响应时调用 loadToken
。
class TokenGenerator {
private String token = null;
public synchronized String getToken() {
if (token == null) {
loadToken();
}
return token;
}
public synchronized void loadToken() {
token = "load here";
}
}
要解决您暂停线程的问题,您可以在任何您想暂停的地方调用 getToken()
Thread
,如果加载令牌当前处于活动状态,它将自动阻止。
class Task implements Runnable{
private String name;
private TokenGenerator tokenGenerator;
public Task(String name, TokenGenerator tokenGenerator) {
this.name = name;
this.tokenGenerator = tokenGenerator;
}
@Override
public void run() {
getResponse(tokenGenerator.getToken());
}
private void getResponse(String token) {
// Make http calls
// if token expire , call getToken again. Pause all the running threads , and
// update the token for all threads
tokenGenerator.loadToken();
}
}
您可以将令牌放在 AtomicReference and use a Semaphore 中以暂停线程:
public class TokenWrapper {
private final AtomicReference<Token> tokenRef = new AtomicReference<>(null);
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
public TokenWrapper() {
Token newToken = // refresh token
tokenRef.set(newToken);
}
public Token getToken() {
Token token = null;
while((token = tokenRef.get()) == null) {
semaphore.acquire();
}
return token;
}
public Token refreshToken(Token oldToken) {
if(tokenRef.compareAndSet(oldToken, null)) {
semaphore.drainPermits();
Token newToken = // refresh token
tokenRef.set(newToken);
semaphore.release(Integer.MAX_VALUE);
return newToken;
} else return getToken();
}
}
public class RESTService {
private static final TokenWrapper tokenWrapper = new TokenWrapper();
public void run() {
Token token = tokenWrapper.getToken();
Response response = // call service with token
if(response.getStatus == 401) {
tokenWrapper.refreshToken(token);
}
}
}
refreshToken()
在 tokenRef
上使用原子 compareAndSet
来确保只有一个线程会刷新令牌,然后在 semaphore
上调用 drainPermits()
使其他线程等待直到令牌被刷新。 getToken()
returns 令牌,如果它不是 null
,否则等待 semaphore
- 这是在循环中完成的,因为线程可能必须自旋在 tokenRef
被设置为 null
和 drainPermits()
被调用 semaphore
之间的几个周期。
编辑:修改了 refreshToken(Token oldToken)
的签名,以便传入旧令牌而不是在方法内部读取 - 这是为了防止 RESTService_A 刷新令牌的情况,RESTService_B 使用旧的过期令牌获得 401,然后 RESTService_B 在 RESTService_A 对 refreshToken
的调用完成后调用 refreshToken
,导致令牌被刷新两次。使用新签名,RESTService_B 将传入旧的过期令牌,因此当旧令牌无法匹配新令牌时,compareAndSet
调用将失败,导致 refreshToken
仅被调用一次。
由于您需要做两件事(http 调用和更新令牌),您可以尝试两种检查方式。
一个检查令牌是否 is expired or not
,另一个检查是否有其他线程正在尝试 update the token
。
这里用一个小代码来演示这个想法(它的操作方式有点脏,所以可能需要一些清理)
...
private string token
private volatile static isTokenExpired=false //checking if the token is expired or not
private volatile static waitingForTokenRefresher=false; //checking if we should wait for update.
@Override
public void run(){
while(tokenisExpired){
//wait
}
//http calls find out if token is good to go
//check if no one else uses the token:
if( token is actually expired){
if(!waitingForTokenRefresher){
isTokenExpired=true;
waitingForTokenRefresher=true;
//refresh token
waitingForTokenRefresher=false
isTokenExpired=false;
}
}
while(!waitingForTokenRefresher){
//wait...
}
}