如何在 Spring Integration using ZooKeeper 中围绕轮询器实现分布式锁
How to implement distributed lock around poller in Spring Integration using ZooKeeper
Spring 集成具有 ZooKeeper 支持,如 https://docs.spring.io/spring-integration/reference/html/zookeeper.html 中所述
但是这个文件太含糊了。
它建议在下面添加 bean,但没有详细说明如何在节点被授予领导权时 start/stop 轮询器。
@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
.setClient(client)
.setPath("/siTest/")
.setRole("cluster");
}
我们是否有任何示例说明如何使用 zookeeper 确保下面的轮询器 运行 在任何时候在集群中仅一次?
@Component
public class EventsPoller {
public void pullEvents() {
//pull events should be run by only one node in the cluster at any time
}
}
LeaderInitiator
发出一个 OnGrantedEvent
和 OnRevokedEvent
,当它成为领导者并且它的领导权被撤销时。
有关这些事件处理及其如何影响特定角色中的组件的更多信息,请参阅 https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#endpoint-roles and the next https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#leadership-event-handling。
虽然我同意 Zookkeper 章节必须有一些 link 到那个 SmartLifecycleRoleController
章节。欢迎就此事提出 JIRA,欢迎投稿!
更新
这是我在测试中所做的:
@RunWith(SpringRunner.class)
@DirtiesContext
public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {
private static CuratorFramework client;
@Autowired
private PollableChannel stringsChannel;
@BeforeClass
public static void getClient() throws Exception {
client = createNewClient();
}
@AfterClass
public static void closeClient() {
if (client != null) {
client.close();
}
}
@Test
public void test() {
assertNotNull(this.stringsChannel.receive(10_000));
}
@Configuration
@EnableIntegration
public static class Config {
@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
.setClient(client)
.setPath("/siTest/")
.setRole("foo");
}
@Bean
public CuratorFramework client() {
return LeaderInitiatorFactoryBeanTests.client;
}
@Bean
@InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
@Role("foo")
public Supplier<String> inboundChannelAdapter() {
return () -> "foo";
}
@Bean
public PollableChannel stringsChannel() {
return new QueueChannel();
}
}
}
我在日志中有这样的内容:
2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
Spring 集成具有 ZooKeeper 支持,如 https://docs.spring.io/spring-integration/reference/html/zookeeper.html 中所述 但是这个文件太含糊了。
它建议在下面添加 bean,但没有详细说明如何在节点被授予领导权时 start/stop 轮询器。
@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
.setClient(client)
.setPath("/siTest/")
.setRole("cluster");
}
我们是否有任何示例说明如何使用 zookeeper 确保下面的轮询器 运行 在任何时候在集群中仅一次?
@Component
public class EventsPoller {
public void pullEvents() {
//pull events should be run by only one node in the cluster at any time
}
}
LeaderInitiator
发出一个 OnGrantedEvent
和 OnRevokedEvent
,当它成为领导者并且它的领导权被撤销时。
有关这些事件处理及其如何影响特定角色中的组件的更多信息,请参阅 https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#endpoint-roles and the next https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#leadership-event-handling。
虽然我同意 Zookkeper 章节必须有一些 link 到那个 SmartLifecycleRoleController
章节。欢迎就此事提出 JIRA,欢迎投稿!
更新
这是我在测试中所做的:
@RunWith(SpringRunner.class)
@DirtiesContext
public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {
private static CuratorFramework client;
@Autowired
private PollableChannel stringsChannel;
@BeforeClass
public static void getClient() throws Exception {
client = createNewClient();
}
@AfterClass
public static void closeClient() {
if (client != null) {
client.close();
}
}
@Test
public void test() {
assertNotNull(this.stringsChannel.receive(10_000));
}
@Configuration
@EnableIntegration
public static class Config {
@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
.setClient(client)
.setPath("/siTest/")
.setRole("foo");
}
@Bean
public CuratorFramework client() {
return LeaderInitiatorFactoryBeanTests.client;
}
@Bean
@InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
@Role("foo")
public Supplier<String> inboundChannelAdapter() {
return () -> "foo";
}
@Bean
public PollableChannel stringsChannel() {
return new QueueChannel();
}
}
}
我在日志中有这样的内容:
2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo