使用 Map 计算解决 Java 线程可见性和并发性错误

Resolving Java thread visibility and concurrency error using Map compute

我使用 Java 8. 我有一个事件处理程序,它以高速率(每秒 n 个)接受事件,当我收到这么多事件时(在这个简化的例子 1000)

第 25 行 myCache.get(event.getKey()).add(event.getBean()); 是否存在可见性错误? 我应该在 handleEvent() 方法上同步吗?

public class myClass extends MySimpleEventHanlder {
    private Map<String, List<MyBean>> myCache;
    private ScheduledExecutorService scheduler;

    public void MyClass() {
        myCache = new ConcurrentHashMap<String, List<MyBean>>();
        scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {

            for (Iterator<Map.Entry<String, List<MyBean>>> it = myCache.entrySet().iterator(); it.hasNext();) {
                Map.Entry<String, List<MyBean>> entry = it.next();
                if (entry.getValue().size() >= 1000) {
                    it.remove();
                    //do some more processing , flush to storage
                }
            }
        }, 0, 60, TimeUnit.SECONDS);
    }

    @Override
    public void handleEvent(Event event) {

        if (myCachetCache.containsKey(event.getKey())) {
            myCache.get(event.getKey()).add(event.getBean());
        }
        else{
            List<MyBean> beans = new ArrayList<MyBeans>();
            beans.add(event.getBean());
            myCache.put(event.key, beans);
        }
    }
}

您肯定有可见性问题:您在一个线程中将项目添加到 ArrayList 中,然后在另一个线程中从该 ArrayList 中读取 size(),两者之间没有同步。

另一个问题是在调用 myCache.containsKeymyCache.get 之间可能会删除密钥。这将导致 NullPointerException。这可以通过使用保证原子性的计算来解决。

    myCache.compute(event.getKey(), (key, value) -> {
        if (value == null) {
            value = new ArrayList<>();
        }
        value.add(event.getBean());
        return value;
    });