Java 同步取决于方法参数

Java synchronization depending on method parameter

如何根据方法参数值提供同步?

所有使用 'same' 参数值 A 的方法调用都应该同步。具有不同参数值的方法调用,例如B 可以访问,即使与 A 的呼叫已经在等待。对 B 的下一个并发调用也必须等待第一个 B 被释放。

我的用例:我想在 ID 级别同步 对 JPA 实体的访问,但想避免悲观锁定,因为我需要一种队列。用于锁定的 'key' 旨在成为 entity ID - 实际上是 Java Long 类型。

protected void entityLockedAccess(SomeEntity myEntity) {
    //getId() returns different Long objects so the lock does not work
    synchronized (myEntity.getId()) {
        //the critical section ...
    }
}

我阅读了有关锁定对象的信息,但不确定它们是否适合我的情况。 在顶层,我想管理对执行关键代码的应用程序的特定 REST 调用。

谢谢, 克里斯

问题是您不应该在 上同步(例如字符串,或 Integer 对象)。

含义:您需要在这里定义一些特殊的 EntityId class,当然,所有使用相同 ID 的 "data" 都需要使用相同的 然后是 EntityId 对象。

被合并并可能被重用的对象不应该用于同步。如果是这样,它可能会导致不相关的线程因无用的堆栈跟踪而死锁。

具体来说,String 字面量和盒装基元(例如 Integers)应该 用作锁定对象,因为它们被合并和重用。

Boolean 对象的情况更糟,因为只有 BooleanBoolean.TRUEBoolean.FALSE 两个实例,每个 class 使用布尔值将指代两者之一。

I read about lock objects but I am not sure how they would suit in my case. On the top level I want to manage a specific REST call to my application which executes critical code.

您的数据库将处理并发写入和其他事务性问题。 您需要做的就是使用交易。

我还建议您解决 class 问题(DIRTY READs NON Repeatable reads). You can also use Optimistic Locking 对于

据我了解,您基本上希望为每个 SomeEntity ID 使用不同的唯一锁。

你可以用 Map<Integer, Object> 来实现这一点。

您只需将每个 ID 映射到一个对象。如果已经有一个对象,您可以重用它。这可能看起来像这样:

static Map<Integer, Object> locks = new ConcurrentHashMap<>();

public static void main(String[] args)
{
    int i1 = 1;
    int i2 = 2;

    foo(i1);
    foo(i1);
    foo(i2);
}

public static void foo(int o)
{
    synchronized (locks.computeIfAbsent(o, k -> new Object()))
    {
        // computation
    }
}

这将在映射中创建 2 个锁对象,因为 i1 的对象在第二次 foo(i1) 调用中被重复使用。

    private static final Set<Integer> lockedIds = new HashSet<>();

    private void lock(Integer id) throws InterruptedException {
        synchronized (lockedIds) {
            while (!lockedIds.add(id)) {
                lockedIds.wait();
            }
        }
    }

    private void unlock(Integer id) {
        synchronized (lockedIds) {
            lockedIds.remove(id);
            lockedIds.notifyAll();
        }
    }

    public void entityLockedAccess(SomeEntity myEntity) throws InterruptedException {
        try {
            lock(myEntity.getId());

            //Put your code here.
            //For different ids it is executed in parallel.
            //For equal ids it is executed synchronously.

        } finally {
            unlock(myEntity.getId());
        }
    }
  • id 不仅可以是 'Integer',还可以是正确覆盖 'equals'[=20= 的任何 class ] 和 'hashCode' 方法。
  • try-finally - 非常重要 - 即使您的操作抛出异常,您也必须保证在操作后解锁等待线程。
  • 如果您的后端分布在 多个 servers/JVMs.
  • 中,它将无法工作

只需使用这个 class: (并且地图不会随时间增加)

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class SameKeySynchronizer<T> {
    
    private final ConcurrentHashMap<T, Object> sameKeyTasks = new ConcurrentHashMap<>();

    public void serializeSameKeys(T key, Consumer<T> keyConsumer) {
        // This map will never be filled (because function returns null), it is only used for synchronization purposes for the same key
        sameKeyTasks.computeIfAbsent(key, inputArgumentKey -> acceptReturningNull(inputArgumentKey, keyConsumer));
    }

    private Object acceptReturningNull(T inputArgumentKey, Consumer<T> keyConsumer) {
        keyConsumer.accept(inputArgumentKey);
        return null;
    }
}

喜欢这个测试:

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class SameKeySynchronizerTest {

    private static final boolean SHOW_FAILING_TEST = false;

    @Test
    void sameKeysAreNotExecutedParallel() throws InterruptedException {

        TestService testService = new TestService();

        TestServiceThread testServiceThread1 = new TestServiceThread(testService, "a");
        TestServiceThread testServiceThread2 = new TestServiceThread(testService, "a");

        testServiceThread1.start();
        testServiceThread2.start();

        testServiceThread1.join();
        testServiceThread2.join();

        Assertions.assertFalse(testService.sameKeyInProgressSimultaneously);
    }

    @Test
    void differentKeysAreExecutedParallel() throws InterruptedException {

        TestService testService = new TestService();

        TestServiceThread testServiceThread1 = new TestServiceThread(testService, "a");
        TestServiceThread testServiceThread2 = new TestServiceThread(testService, "b");

        testServiceThread1.start();
        testServiceThread2.start();

        testServiceThread1.join();
        testServiceThread2.join();

        Assertions.assertFalse(testService.sameKeyInProgressSimultaneously);
        Assertions.assertTrue(testService.differentKeysInProgressSimultaneously);
    }

    private class TestServiceThread extends Thread {
        TestService testService;
        String key;

        TestServiceThread(TestService testService, String key) {
            this.testService = testService;
            this.key = key;
        }

        @Override
        public void run() {
            testService.process(key);
        }
    }

    private class TestService {

        private final SameKeySynchronizer<String> sameKeySynchronizer = new SameKeySynchronizer<>();

        private Set<String> keysInProgress = ConcurrentHashMap.newKeySet();
        private boolean sameKeyInProgressSimultaneously = false;
        private boolean differentKeysInProgressSimultaneously = false;

        void process(String key) {
            if (SHOW_FAILING_TEST) {
                processInternal(key);
            } else {
                sameKeySynchronizer.serializeSameKeys(key, inputArgumentKey -> processInternal(inputArgumentKey));
            }
        }

        @SuppressWarnings("MagicNumber")
        private void processInternal(String key) {
            try {
                boolean keyInProgress = !keysInProgress.add(key);
                if (keyInProgress) {
                    sameKeyInProgressSimultaneously = true;
                }
                try {
                    int sleepTimeInMillis = 100;
                    for (long elapsedTimeInMillis = 0; elapsedTimeInMillis < 1000; elapsedTimeInMillis += sleepTimeInMillis) {
                        Thread.sleep(sleepTimeInMillis);
                        if (keysInProgress.size() > 1) {
                            differentKeysInProgressSimultaneously = true;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            } finally {
                keysInProgress.remove(key);
            }
        }
    }
}