每当两个流上发生写入时如何阻止读取?
How to block reads whenever writes are happening on two flows?
我正在尝试实现锁定,希望通过该锁定避免在我进行写入时发生读取。
我的要求是:
- 读取块,直到第一次设置所有两个地图。
- 现在第二次,如果我正在更新地图,我仍然可以 return 所有两个旧地图值(在所有两个地图上完成更新之前)或者它应该阻塞并且 return 每当在所有两张地图上完成更新时,我所有新的两张地图值。
因为我有两个地图 - primaryMapping
, secondaryMapping
所以它应该 return 两个更新地图的所有新值或者它应该 return 所有旧的地图的价值。基本上,在更新时我不想 return primaryMapping
有旧值,secondaryMapping
有新值。它应该是一致的,要么它应该 return 旧值,要么它应该在更新地图后 return 新值。就我而言,地图更新将在 7 或 8 个月内发生一次(很少)。我正在为此使用 Countdown Latch,到目前为止它工作正常,没有任何问题。
现在我有两个流程,如下所示:对于每个流程,我有一个 URL 从那里我们可以获取上述两个地图的数据。通常,我将为这两个流设置两个映射,因此与设备流相比,PROCESS 流的这两个映射具有不同的值。
public enum FlowType {
PROCESS, DEVICE;
}
我有一个后台线程 运行 每 5 分钟从每个 FLOW url 获取数据并在有更新时填充这两个地图。当应用程序第一次启动时,它会更新映射,之后,它会在 7-8 个月后更新映射。并且这并不意味着两个流映射会同时发生变化。 PROCESS 映射可能有变化但 DEVICE 映射没有变化。
public class DataScheduler {
private RestTemplate restTemplate = new RestTemplate();
private static final String PROCESS_URL = "http://process_flow_url/";
private static final String DEVICE_URL = "http://device_flow_url/";
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
callService();
} catch (Exception ex) {
// logging exception here using logger
}
}
}, 0, 5, TimeUnit.MINUTES);
}
public void callService() throws Exception {
String url = null;
Map<FlowType, String> holder = new HashMap<FlowType, String>();
for (FlowType flow : FlowType.values()) {
try {
url = getURL(flow);
String response = restTemplate.getForObject(url, String.class);
holder.put(flow, response);
} catch (RestClientException ex) {
// logging exception here using logger
}
}
parseResponse(holder);
}
private void parseResponse(Map<FlowType, String> responses) throws Exception {
Map<FlowType, Mapping> partitionMapper = new HashMap<FlowType, Mapping>();
boolean update = false;
for (Map.Entry<FlowType, String> responseEntry : responses.entrySet()) {
FlowType flow = responseEntry.getKey();
String response = responseEntry.getValue();
Map<String, Map<Integer, Integer>> primaryMapping = new HashMap<>();
Map<String, Map<Integer, Integer>> secondaryMapping = new HashMap<>();
if (!DataUtils.isEmpty(response)) {
try (Scanner scanner = new Scanner(response)) {
boolean hasProcess = Boolean.parseBoolean(scanner.nextLine().trim().substring(HAS_PROCESS_LEN));
if (hasProcess) {
update = true;
// some code
partitionMapper.put(flow, PartitionHolder.createMapping(
primaryMapping, secondaryMapping));
}
}
}
}
// if there is any update, then only update the mappings, otherwise not.
if (update) {
PartitionHolder.setMappings(partitionMapper);
}
}
}
正如您在上面看到的,如果任何流映射已更新,它会通过调用 setMappings
方法来更新映射。
下面是我的 PartitionHolder class:
public class PartitionHolder {
public static class Mapping {
public final Map<String, Map<Integer, Integer>> primaryMapping;
public final Map<String, Map<Integer, Integer>> secondaryMapping;
public Mapping(Map<String, Map<Integer, Integer>> primaryMapping,
Map<String, Map<Integer, Integer>> secondaryMapping) {
this.primaryMapping = primaryMapping;
this.secondaryMapping = secondaryMapping;
}
// getters here
}
private static final AtomicReference<Map<FlowType, Mapping>> mappingsHolder = new AtomicReference<Map<FlowType, Mapping>>();
private static final CountDownLatch hasInitialized = new CountDownLatch(1);
public static Mapping getFlowMapping(FlowType flowType) {
try {
hasInitialized.await();
return mappingsHolder.get().get(flowType);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
public static void setMappings(Map<FlowType, Mapping> newMapData) {
mappingsHolder.set(newMapData);
hasInitialized.countDown();
}
public static Mapping createMapping(
Map<String, Map<Integer, Integer>> primaryMapping,
Map<String, Map<Integer, Integer>> secondaryMapping) {
return new Mapping(primaryMapping, secondaryMapping);
}
}
现在这就是我在主线程中使用 PartitionHolder
class 的方式。在一次调用中,我们将只获得一个流的映射。在下面的代码中 dataKey.getFlowType()
可以是 PROCESS OR DEVICE。
@Override
public DataResponse call() throws Exception {
Mapping mappings = PartitionHolder.getFlowMapping(dataKey.getFlowType());
// use mappings object here
}
现在,如您所见,在我上面的代码中,如果任何流映射已更新,我将通过调用 setMappings
方法来更新映射。可能会发生两种情况:
- 当应用程序第一次启动时,它可以正常工作,没有任何问题,因为它会更新两个流的映射。
- 现在第二次,假设对于 PROCESS 流,映射已更新,但对于 DEVICE 流映射尚未更新,那么在这种情况下,它将通过调用
setMappings
方法更新整个映射,该方法将覆盖mappingsHolder
。这种方法的问题是,对于设备流,我将在调用 getFlowMapping
时开始获取空映射,因为它会将设备映射覆盖为空映射。对吧?
如何避免第二个问题?除了使用 Map 和 key 作为 FlowType 之外,有没有更好的方法来解决这个问题?我需要在这里使用工厂模式来解决这个问题还是其他更好的方法?
ConcurrentHashMap
是线程安全的并且相对较快,因此可以简化问题:
private static final ConcurrentHashMap<FlowType, Mapping> mappingsHolder =
new ConcurrentHashMap<FlowType, Mapping>();
public static void setMappings(FlowType flowType, Mapping newMapData) {
mappingsHolder.put(flowType, newMapData);
hasInitialized.countDown();
}
public static Mapping getFlowMapping(FlowType flowType) {
try {
hasInitialized.await();
return mappingsHolder.get(flowType);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
那应该可以解决第二个问题。
我同意 vanOekel 的观点,ConcurrentHashMap
会有所帮助 - 在你的 DataScheduler
中执行 try (Scanner scanner = new Scanner(response))
块内的地图更新 - 如果抛出异常,那么地图将保持不变。
添加一个
public static boolean isInitialized() {
return hasInitialized.getCount() == 0;
}
方法为 PartitionHolder
,然后修改 callService
以在映射尚未初始化时执行重试(这将永远重试,或者您可以重试 N
次,然后使用默认数据或中止程序或第三个选项)。
public void callService() throws Exception {
boolean doRetry = true;
while(doRetry) {
String url = null;
Map<FlowType, String> holder = new HashMap<FlowType, String>();
for (FlowType flow : FlowType.values()) {
try {
url = getURL(flow);
String response = restTemplate.getForObject(url, String.class);
holder.put(flow, response);
} catch (RestClientException ex) {
// logging exception here using logger
}
}
try {
parseResponse(holder);
doRetry = false;
} catch(Exception e) {
doRetry = !PartitionHolder.isInitialized();
if(PartitionHolder.isInitialized()) {
throw e;
}
}
}
}
我正在尝试实现锁定,希望通过该锁定避免在我进行写入时发生读取。
我的要求是:
- 读取块,直到第一次设置所有两个地图。
- 现在第二次,如果我正在更新地图,我仍然可以 return 所有两个旧地图值(在所有两个地图上完成更新之前)或者它应该阻塞并且 return 每当在所有两张地图上完成更新时,我所有新的两张地图值。
因为我有两个地图 - primaryMapping
, secondaryMapping
所以它应该 return 两个更新地图的所有新值或者它应该 return 所有旧的地图的价值。基本上,在更新时我不想 return primaryMapping
有旧值,secondaryMapping
有新值。它应该是一致的,要么它应该 return 旧值,要么它应该在更新地图后 return 新值。就我而言,地图更新将在 7 或 8 个月内发生一次(很少)。我正在为此使用 Countdown Latch,到目前为止它工作正常,没有任何问题。
现在我有两个流程,如下所示:对于每个流程,我有一个 URL 从那里我们可以获取上述两个地图的数据。通常,我将为这两个流设置两个映射,因此与设备流相比,PROCESS 流的这两个映射具有不同的值。
public enum FlowType {
PROCESS, DEVICE;
}
我有一个后台线程 运行 每 5 分钟从每个 FLOW url 获取数据并在有更新时填充这两个地图。当应用程序第一次启动时,它会更新映射,之后,它会在 7-8 个月后更新映射。并且这并不意味着两个流映射会同时发生变化。 PROCESS 映射可能有变化但 DEVICE 映射没有变化。
public class DataScheduler {
private RestTemplate restTemplate = new RestTemplate();
private static final String PROCESS_URL = "http://process_flow_url/";
private static final String DEVICE_URL = "http://device_flow_url/";
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
callService();
} catch (Exception ex) {
// logging exception here using logger
}
}
}, 0, 5, TimeUnit.MINUTES);
}
public void callService() throws Exception {
String url = null;
Map<FlowType, String> holder = new HashMap<FlowType, String>();
for (FlowType flow : FlowType.values()) {
try {
url = getURL(flow);
String response = restTemplate.getForObject(url, String.class);
holder.put(flow, response);
} catch (RestClientException ex) {
// logging exception here using logger
}
}
parseResponse(holder);
}
private void parseResponse(Map<FlowType, String> responses) throws Exception {
Map<FlowType, Mapping> partitionMapper = new HashMap<FlowType, Mapping>();
boolean update = false;
for (Map.Entry<FlowType, String> responseEntry : responses.entrySet()) {
FlowType flow = responseEntry.getKey();
String response = responseEntry.getValue();
Map<String, Map<Integer, Integer>> primaryMapping = new HashMap<>();
Map<String, Map<Integer, Integer>> secondaryMapping = new HashMap<>();
if (!DataUtils.isEmpty(response)) {
try (Scanner scanner = new Scanner(response)) {
boolean hasProcess = Boolean.parseBoolean(scanner.nextLine().trim().substring(HAS_PROCESS_LEN));
if (hasProcess) {
update = true;
// some code
partitionMapper.put(flow, PartitionHolder.createMapping(
primaryMapping, secondaryMapping));
}
}
}
}
// if there is any update, then only update the mappings, otherwise not.
if (update) {
PartitionHolder.setMappings(partitionMapper);
}
}
}
正如您在上面看到的,如果任何流映射已更新,它会通过调用 setMappings
方法来更新映射。
下面是我的 PartitionHolder class:
public class PartitionHolder {
public static class Mapping {
public final Map<String, Map<Integer, Integer>> primaryMapping;
public final Map<String, Map<Integer, Integer>> secondaryMapping;
public Mapping(Map<String, Map<Integer, Integer>> primaryMapping,
Map<String, Map<Integer, Integer>> secondaryMapping) {
this.primaryMapping = primaryMapping;
this.secondaryMapping = secondaryMapping;
}
// getters here
}
private static final AtomicReference<Map<FlowType, Mapping>> mappingsHolder = new AtomicReference<Map<FlowType, Mapping>>();
private static final CountDownLatch hasInitialized = new CountDownLatch(1);
public static Mapping getFlowMapping(FlowType flowType) {
try {
hasInitialized.await();
return mappingsHolder.get().get(flowType);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
public static void setMappings(Map<FlowType, Mapping> newMapData) {
mappingsHolder.set(newMapData);
hasInitialized.countDown();
}
public static Mapping createMapping(
Map<String, Map<Integer, Integer>> primaryMapping,
Map<String, Map<Integer, Integer>> secondaryMapping) {
return new Mapping(primaryMapping, secondaryMapping);
}
}
现在这就是我在主线程中使用 PartitionHolder
class 的方式。在一次调用中,我们将只获得一个流的映射。在下面的代码中 dataKey.getFlowType()
可以是 PROCESS OR DEVICE。
@Override
public DataResponse call() throws Exception {
Mapping mappings = PartitionHolder.getFlowMapping(dataKey.getFlowType());
// use mappings object here
}
现在,如您所见,在我上面的代码中,如果任何流映射已更新,我将通过调用 setMappings
方法来更新映射。可能会发生两种情况:
- 当应用程序第一次启动时,它可以正常工作,没有任何问题,因为它会更新两个流的映射。
- 现在第二次,假设对于 PROCESS 流,映射已更新,但对于 DEVICE 流映射尚未更新,那么在这种情况下,它将通过调用
setMappings
方法更新整个映射,该方法将覆盖mappingsHolder
。这种方法的问题是,对于设备流,我将在调用getFlowMapping
时开始获取空映射,因为它会将设备映射覆盖为空映射。对吧?
如何避免第二个问题?除了使用 Map 和 key 作为 FlowType 之外,有没有更好的方法来解决这个问题?我需要在这里使用工厂模式来解决这个问题还是其他更好的方法?
ConcurrentHashMap
是线程安全的并且相对较快,因此可以简化问题:
private static final ConcurrentHashMap<FlowType, Mapping> mappingsHolder =
new ConcurrentHashMap<FlowType, Mapping>();
public static void setMappings(FlowType flowType, Mapping newMapData) {
mappingsHolder.put(flowType, newMapData);
hasInitialized.countDown();
}
public static Mapping getFlowMapping(FlowType flowType) {
try {
hasInitialized.await();
return mappingsHolder.get(flowType);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
那应该可以解决第二个问题。
我同意 vanOekel 的观点,ConcurrentHashMap
会有所帮助 - 在你的 DataScheduler
中执行 try (Scanner scanner = new Scanner(response))
块内的地图更新 - 如果抛出异常,那么地图将保持不变。
添加一个
public static boolean isInitialized() {
return hasInitialized.getCount() == 0;
}
方法为 PartitionHolder
,然后修改 callService
以在映射尚未初始化时执行重试(这将永远重试,或者您可以重试 N
次,然后使用默认数据或中止程序或第三个选项)。
public void callService() throws Exception {
boolean doRetry = true;
while(doRetry) {
String url = null;
Map<FlowType, String> holder = new HashMap<FlowType, String>();
for (FlowType flow : FlowType.values()) {
try {
url = getURL(flow);
String response = restTemplate.getForObject(url, String.class);
holder.put(flow, response);
} catch (RestClientException ex) {
// logging exception here using logger
}
}
try {
parseResponse(holder);
doRetry = false;
} catch(Exception e) {
doRetry = !PartitionHolder.isInitialized();
if(PartitionHolder.isInitialized()) {
throw e;
}
}
}
}