java, OrbitzWorld consul 客户端 - 如何管理 session 和锁?
java, OrbitzWorld consul client - How to manage session and locks?
使用 java OrbitzWorld consul 客户端,我试图通过 acquireLock
方法同步我的 java 应用程序的多个实例。
到目前为止我的代码:
将应用程序注册为 Consul 服务:
private void registerService(Config config) {
String serviceId = config.getService().getId();
String serviceName = config.getService().getName();
long ttl = config.getService().getTtl();
AgentClient agentClient = client.agentClient();
Registration service = ImmutableRegistration.builder()
.id(serviceId)
.name(serviceName)
.check(Registration.RegCheck.ttl(ttl))
.build();
agentClient.register(service);
new HeartBeater(agentClient, serviceId, ttl).start();
}
心跳:
@Override
public void run() {
while(true) {
try {
client.pass(serviceId);
Thread.sleep((Math.max(ttl / 2, 1)));
} catch (NotRegisteredException | InterruptedException e) {}
}
}
上面的代码有效,服务在 consul 中成功刷新。
现在我想知道锁定的实现。
到目前为止我写了什么:
public boolean amILeader() {
// return if current java app is leader
}
private String createSession() {
final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange() {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
kvCache.addListener(map -> {
Value value = map.get(Constants.LEADER_LOCK_KEY);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
}
});
kvCache.start();
}
我被困在这里,因为我不理解这个理论,也没有在文档中找到任何有用的东西。
我的问题:
- session 是通过
acquireLock
方法进行同步所必需的吗?
- 如果是,session 何时以及如何 created/synchronized 结束?
- session 无效是否很常见?根据文档,如果其中一项服务无法发送 ttl,就会发生这种情况,这种情况经常发生。
- 生活服务如何同步创建新的 session?
- 服务如何在锁上同步?
你能提供一些代码示例或填写我的实现吗?
感谢您的任何回复:]
我想我现在明白了。
理论是这样的:
- Consul 会话表示从单个服务到 Consul 的连接。在我的例子中,它代表了我的一个 java 应用程序实例和 Consul
之间的连接
- 会话用于获取锁。当客户端来到 Consul 并想要获取锁时,Consul 将检查是否有任何与之关联的 sessionId。如果没有,Consul 给客户端一个锁,并将客户端 sessionId 与锁相关联。
- 这把锁没什么特别的。它只是保存在 Consul 节点上的 KV 映射中的一个键。
- 您可以检查锁以及是否有任何 sessionId 与其关联,如下所示:
public class SessionFacade {
private String leaderLock;
private String sessionId;
private Consul client;
private Config config;
public SessionFacade(Consul client, Config config) {
this.client = client;
this.config = config;
this.leaderLock = "service/" + config.getService().getName() + "/leader";
this.sessionId = createSession();
new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
watchLeaderLockStateChange(sessionId);
client.keyValueClient().acquireLock(leaderLock, sessionId);
}
public boolean doIPossesLeaderLock() {
Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
if(leaderValue.isPresent()) {
Optional<String> session = leaderValue.get().getSession();
return session.isPresent() && session.get().equals(sessionId);
}
return false;
}
private String createSession() {
int sessionTtl = config.getService().getSessionTtl();
final Session session = ImmutableSession.builder()
.name(config.getService().getName())
.ttl(sessionTtl + "s")
.build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange(String sessionId) {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
kvCache.addListener(map -> {
Value value = map.get(leaderLock);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(leaderLock, sessionId);
}
});
kvCache.start();
}
}
请注意,该代码可能存在错误,因为我尚未对其进行全面测试。
你读完 https://learn.hashicorp.com/consul/developer-configuration/elections 了吗?它在使用 Consul 进行领导选举的应用程序级别介绍了这个场景。
使用 java OrbitzWorld consul 客户端,我试图通过 acquireLock
方法同步我的 java 应用程序的多个实例。
到目前为止我的代码:
将应用程序注册为 Consul 服务:
private void registerService(Config config) {
String serviceId = config.getService().getId();
String serviceName = config.getService().getName();
long ttl = config.getService().getTtl();
AgentClient agentClient = client.agentClient();
Registration service = ImmutableRegistration.builder()
.id(serviceId)
.name(serviceName)
.check(Registration.RegCheck.ttl(ttl))
.build();
agentClient.register(service);
new HeartBeater(agentClient, serviceId, ttl).start();
}
心跳:
@Override
public void run() {
while(true) {
try {
client.pass(serviceId);
Thread.sleep((Math.max(ttl / 2, 1)));
} catch (NotRegisteredException | InterruptedException e) {}
}
}
上面的代码有效,服务在 consul 中成功刷新。
现在我想知道锁定的实现。
到目前为止我写了什么:
public boolean amILeader() {
// return if current java app is leader
}
private String createSession() {
final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange() {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
kvCache.addListener(map -> {
Value value = map.get(Constants.LEADER_LOCK_KEY);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
}
});
kvCache.start();
}
我被困在这里,因为我不理解这个理论,也没有在文档中找到任何有用的东西。
我的问题:
- session 是通过
acquireLock
方法进行同步所必需的吗? - 如果是,session 何时以及如何 created/synchronized 结束?
- session 无效是否很常见?根据文档,如果其中一项服务无法发送 ttl,就会发生这种情况,这种情况经常发生。
- 生活服务如何同步创建新的 session?
- 服务如何在锁上同步?
你能提供一些代码示例或填写我的实现吗? 感谢您的任何回复:]
我想我现在明白了。
理论是这样的:
- Consul 会话表示从单个服务到 Consul 的连接。在我的例子中,它代表了我的一个 java 应用程序实例和 Consul 之间的连接
- 会话用于获取锁。当客户端来到 Consul 并想要获取锁时,Consul 将检查是否有任何与之关联的 sessionId。如果没有,Consul 给客户端一个锁,并将客户端 sessionId 与锁相关联。
- 这把锁没什么特别的。它只是保存在 Consul 节点上的 KV 映射中的一个键。
- 您可以检查锁以及是否有任何 sessionId 与其关联,如下所示:
public class SessionFacade {
private String leaderLock;
private String sessionId;
private Consul client;
private Config config;
public SessionFacade(Consul client, Config config) {
this.client = client;
this.config = config;
this.leaderLock = "service/" + config.getService().getName() + "/leader";
this.sessionId = createSession();
new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
watchLeaderLockStateChange(sessionId);
client.keyValueClient().acquireLock(leaderLock, sessionId);
}
public boolean doIPossesLeaderLock() {
Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
if(leaderValue.isPresent()) {
Optional<String> session = leaderValue.get().getSession();
return session.isPresent() && session.get().equals(sessionId);
}
return false;
}
private String createSession() {
int sessionTtl = config.getService().getSessionTtl();
final Session session = ImmutableSession.builder()
.name(config.getService().getName())
.ttl(sessionTtl + "s")
.build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange(String sessionId) {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
kvCache.addListener(map -> {
Value value = map.get(leaderLock);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(leaderLock, sessionId);
}
});
kvCache.start();
}
}
请注意,该代码可能存在错误,因为我尚未对其进行全面测试。
你读完 https://learn.hashicorp.com/consul/developer-configuration/elections 了吗?它在使用 Consul 进行领导选举的应用程序级别介绍了这个场景。