从单个线程修改哈希映射并从多个线程读取?

Modifying hash map from a single thread and reading from multiple threads?

我有一个 class,其中我每 30 秒从一个后台线程填充一个映射 liveSocketsByDatacenter,然后我有一个方法 getNextSocket,它将被多个调用reader 线程获取可用的实时套接字,它使用相同的映射来获取此信息。

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  public Optional<SocketHolder> getNextSocket() {
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      Collections.shuffle(listOfEndPoints);
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          return Optional.of(obj);
        }
      }
    }
    return Optional.absent();
  }

  private void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();

        boolean status = SendToSocket.getInstance().execute(3, holder, socket);
        boolean isLive = (status) ? true : false;
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
    }
  }
}

正如你在我上面看到的class:

我上面的代码线程安全并且所有 reader 线程都能准确地看到 liveSocketsByDatacenter 吗?由于我每 30 秒从一个后台线程修改 liveSocketsByDatacenter 映射,然后从许多 reader 线程修改一次,因此我正在调用 getNextSocket 方法,所以我不确定我是否做错了什么这里。

看起来我的 "getLiveSocket" 方法中可能存在线程安全问题,因为每次读取都会从映射中获取共享的 ArrayList 并对其进行洗牌?而且可能还有一些我可能错过的地方。在我的代码中修复这些线程安全问题的最佳方法是什么?

如果有任何更好的方法来重写它,那么我也愿意。

看来,您可以在这里安全地使用 ConcurrentHashMap 而不是常规的 HashMap,它应该可以工作。

在您当前的方法中,使用常规 HashMap,您需要同步方法:

getNextSocketconnectToZMQSocketsupdateLiveSockets(在您更新或阅读 HashMap 的任何地方)就像在这些方法之前的 sychronized 词或监视器上常见的其他锁所有这些方法 - 这不是因为 ConcurrentModificationException,而是因为没有同步读取线程可以看到未更新的值。

在getLiveSocket中并发修改也有问题,避免这个问题的最简单方法之一是在shuffle之前将listOfEndpoints复制到一个新列表,如下所示:

private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> endPoints) {
    List<SocketHolder> listOfEndPoints = new ArrayList<SocketHolder>(endPoints);
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {

      Collections.shuffle(listOfEndPoints);
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          return Optional.of(obj);
        }
      }
    }
    return Optional.absent();
  }

您可以详细阅读,例如here,如果多个线程并发访问一个哈希映射,并且至少有一个线程在结构上修改了映射,则必须在外部同步它以避免内容视图不一致。 因此,为了线程安全,您应该使用 Java Collections synchronizedMap() 方法或 ConcurrentHashMap。

//synchronizedMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.synchronizedMap(new HashMap<Datacenters, List<SocketHolder>>());    

//ConcurrentHashMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<Datacenters, List<SocketHolder>>();

由于你有非常高并发的应用程序在不同的线程中修改和读取键值,你也应该看看 Producer-Consumer 原则,例如here.

使用 ConcurrentHashMap 应该可以使您的代码线程安全。或者使用同步方法来访问现有的哈希图。

为了线程安全,您的代码必须同步对所有共享可变状态的任何访问。

在这里你分享 liveSocketsByDatacenter,一个 HashMap 的实例,一个 非线程安全的 实现 Map 可能是并发的读取(由 updateLiveSocketsgetNextSocket)和修改(由 connectToZMQSocketsupdateLiveSockets)而不同步任何已经足以使您的代码非线程安全的访问。此外,此 Map 的值是 ArrayList 非线程安全 实现的 List 的实例,也可能同时读取(由 getNextSocketupdateLiveSockets) 修改(由 getLiveSocket 更准确地由 Collections.shuffle)修改。

解决 2 个线程安全问题的简单方法是:

  1. 对变量 liveSocketsByDatacenter 使用 ConcurrentHashMap 而不是 HashMap,因为它是 Map.
  2. 的本机线程安全实现
  3. 使用 Collections.unmodifiableList(List<? extends T> list)unmodifiable 版本的 ArrayList 实例作为地图的值,然后您的列表将是不可变的,因此线程安全。

例如:

liveSocketsByDatacenter.put(
    entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`
  1. 重写你的方法 getLiveSocket 以避免直接在你的列表上调用 Collections.shuffle,例如你可以只随机播放活动套接字列表而不是所有套接字或使用列表的副本(使用例如 new ArrayList<>(listOfEndPoints)) 而不是列表本身。

例如:

private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
            if (obj.isLive()) {
                liveOnly.add(obj);
            }
        }
        if (!liveOnly.isEmpty()) {
            // The list is not empty so we shuffle it an return the first element
            Collections.shuffle(liveOnly);
            return Optional.of(liveOnly.get(0));
        }
    }
    return Optional.absent();
}

对于 #1,因为您似乎经常阅读并且很少(每 30 秒一次)修改您的地图,您可以考虑重建您的地图,然后每 30 秒共享其不可变版本(使用 Collections.unmodifiableMap(Map<? extends K,? extends V> m)) ,这种方法在大多数读取场景中非常有效,因为您不再需要为访问地图内容而支付任何同步机制的费用。

您的代码将是:

// Your variable is no more final, it is now volatile to ensure that all 
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter 
    = Collections.unmodifiableMap(new HashMap<>());

private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : 
        socketsByDatacenter.entrySet()) {

        List<SocketHolder> addedColoSockets = connect(
            entry.getKey(), entry.getValue(), ZMQ.PUSH
        );
        liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
    }
    // Set the new content of my map as an unmodifiable map
    this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}

public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        this.liveSocketsByDatacenter;
    ...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        new HashMap<>(this.liveSocketsByDatacenter);
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
        ...
        liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
    // Set the new content of my map as an unmodifiable map
    this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}

您的字段 liveSocketsByDatacenter 也可以是 AtomicReference<Map<Datacenters, List<SocketHolder>>> 类型,然后它将是 final,您的地图仍将存储在 volatile 变量中,但在class AtomicReference.

之前的代码将是:

private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter 
    = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));

...

private void connectToZMQSockets() {
    ...
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}

public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        this.liveSocketsByDatacenter.get();
    ...
}

// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        new HashMap<>(this.liveSocketsByDatacenter.get());
    ...
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}