响应式编程 - 运行 个集群中的作业
Reactive programming - running jobs in a cluster
我需要 运行 集群中的一些作业,一次只有一个。
因为我的团队使用 Hazelcast,所以我最终得到了一个基于
Hazelcast ILock 实施。出于问题的目的,我将对此进行概括。假设我们有以下接口(可以很容易地实现,例如通过 Hazelcast 或 Reddison (Redis)):
public interface MyDistributedLock {
boolean lock();
void unlock();
boolean isLockedByCurrentThread();
}
public interface MyLockDistributedFactory {
MyDistributedLock getLock(String name);
}
和lock方法在无法获取锁时等待:
private Mono<Void> lock(String name, Publisher<?> publisher, MyLockDistributedFactory myLockFactory) {
// important to release lock on the same thread as
// it was aquired
Scheduler scheduler = Schedulers.newSingle(name.toLowerCase());
return Mono.defer(() -> Mono.just(myLockFactory.getLock(name)))
publishOn(scheduler)
.doOnNext(MyDistributedLock::lock)
.doOnNext(lock -> LOGGER.info("Process acquired lock for resource {}", name))
.flatMapMany(lock -> Flux.from(publisher))
.publishOn(scheduler)
.doFinally(signalType -> {
MyDistributedLock lock = myLockFactory.getLock(name);
if (signalType == SignalType.CANCEL) {
// cancel ignores publishOn
scheduler.schedule(() -> {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
});
} else if (lock.isLockedByCurrentThread()) {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
}
})
.then();
}
和一些工作的例子
private Mono<Void> someJobRunEveryOneHourOnEveryNodeInCluster() {
MyLockDistributedFactory hazelcast = ...;
return lock("some-job", Flux.just(1,2), hazelcast)
.repeatWhen(afterOneHour());
}
我想知道这是否是使用 Project reactor(和正确实施)的好方法,还是应该以不同的方式完成。请指教
当 使用 Reactor 时,这是一种正确的方法,因为您已经注意将阻塞部分偏移到专用的 Scheduler
/Thread
。
但我要说的是,像这样的互斥代码通常不太适合反应式编程:您失去了用更少线程做更多事情的主要好处之一,您冒着阻塞应用程序其他部分的风险你忘了 publishOn
一个专用的线程等等...
我需要 运行 集群中的一些作业,一次只有一个。 因为我的团队使用 Hazelcast,所以我最终得到了一个基于 Hazelcast ILock 实施。出于问题的目的,我将对此进行概括。假设我们有以下接口(可以很容易地实现,例如通过 Hazelcast 或 Reddison (Redis)):
public interface MyDistributedLock {
boolean lock();
void unlock();
boolean isLockedByCurrentThread();
}
public interface MyLockDistributedFactory {
MyDistributedLock getLock(String name);
}
和lock方法在无法获取锁时等待:
private Mono<Void> lock(String name, Publisher<?> publisher, MyLockDistributedFactory myLockFactory) {
// important to release lock on the same thread as
// it was aquired
Scheduler scheduler = Schedulers.newSingle(name.toLowerCase());
return Mono.defer(() -> Mono.just(myLockFactory.getLock(name)))
publishOn(scheduler)
.doOnNext(MyDistributedLock::lock)
.doOnNext(lock -> LOGGER.info("Process acquired lock for resource {}", name))
.flatMapMany(lock -> Flux.from(publisher))
.publishOn(scheduler)
.doFinally(signalType -> {
MyDistributedLock lock = myLockFactory.getLock(name);
if (signalType == SignalType.CANCEL) {
// cancel ignores publishOn
scheduler.schedule(() -> {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
});
} else if (lock.isLockedByCurrentThread()) {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
}
})
.then();
}
和一些工作的例子
private Mono<Void> someJobRunEveryOneHourOnEveryNodeInCluster() {
MyLockDistributedFactory hazelcast = ...;
return lock("some-job", Flux.just(1,2), hazelcast)
.repeatWhen(afterOneHour());
}
我想知道这是否是使用 Project reactor(和正确实施)的好方法,还是应该以不同的方式完成。请指教
当 使用 Reactor 时,这是一种正确的方法,因为您已经注意将阻塞部分偏移到专用的 Scheduler
/Thread
。
但我要说的是,像这样的互斥代码通常不太适合反应式编程:您失去了用更少线程做更多事情的主要好处之一,您冒着阻塞应用程序其他部分的风险你忘了 publishOn
一个专用的线程等等...