多线程:java中基于资源的动态对象锁定

Multithread: Dynamically object locking based on resource in hava

这里需要线程专家的眼睛...

我正在使用一个 poc 应用程序,我正在 FTP 服务器上上传文件

在FTP服务器中有多个文件夹。根据输入响应,我从文件夹中读取文件并移动到另一个文件夹

该应用程序可以同时被多个线程访问。

所以问题是这样的:

假设 FTP 有文件夹 Folder_A 和 A_A_FOLDER 现在 Folder_A 有 10 个文件。 一个线程从 FTP 中读取了 10 个文件并开始对其进行一些计算, 它一一计算然后移动到A_A_FOLDER 它在过程中(假设它成功地将 5 个文件从 Folder_A 移动到 A_A_FOLDER) 然后另一个线程来了,它选择了剩余的 5 个文件,因为它们被线程 1 处理不足,所以线程 2 也开始处理这 5 个文件

这里是重复文件问题

void m1(String folderName) {
// FTP related code
}

我已经通过使用 synchronized 关键字

解决了这个问题

现在一切都在同步,所有处理工作正常

synchronized void m1(String folderName) {
// code
}

文件夹名称决定需要处理的文件夹

现在我开始遇到性能问题

因为该方法是同步的,所以所有线程都将等待,直到处理线程未完成其任务。

我可以通过以下步骤改进它:

(在解决这个问题之前,有一些故事可以深入挖掘这个问题)

正如我提到的 m1 方法的 folderName 参数决定将处理哪个文件夹, 所以假设我在 Ftp 服务器中有 4 个文件夹(A、B、A_T、B_T),其中 2 个文件夹是需要从中读取数据的文件夹(A 和 B), 2 个文件夹是数据将移动的文件夹(A_T 和 B_T)

A_T 和 B_T 不是这里的关注点,因为它们对于每个文件夹 A 和 B 都是唯一的 因此,如果该方法将从 A 读取,那么它会将其移动到 A_T 与 B 相同(移动到 B_T)

现在:

假设 m1 方法有 4 个线程,文件夹 A 有 3 个线程,文件夹 B 有 1 个 如果以某种方式基于 fileName 参数同步请求以便我可以提高性能,这意味着 1 个线程将在 A 上工作,另外 2 个线程将阻塞,因为 fileName 对它们来说是相同的,所以它们将等到第一个线程未完成任务,线程 4 将并行无需任何锁定过程即可工作,因为它的文件名不同

那么我怎样才能在代码级别实现这个(在文件名上同步)呢?

注意:我知道我可以使用资源的静态锁定列表然后锁定文件名资源来打破这个逻辑 例如:

private final Object A = new Object();
private final Object B = new Object();

但这种方法的问题是文件夹可以动态添加,所以我不能这样做。

需要你们的帮助。

一种方法是为每个目录维护一个锁:

public class DirectoryTaskManager {
    public static void main(String[] args) throws IOException {
        DirectoryTaskManager manager = new DirectoryTaskManager();
        manager.withDirLock(new File("Folder_A"), () -> System.out.println("Doing something..."));
    }

    public void withDirLock(File dir, Runnable task) throws IOException {
        ReentrantLock lock = getDirLock(dir);
        lock.lock();
        try {
            task.run();
        } finally {
            lock.unlock();
        }
    }

    private Map<File, ReentrantLock> dirLocks = Collections.synchronizedMap(new HashMap<>());

    public ReentrantLock getDirLock(File dir) throws IOException {
        // Resolve the canonical file here so that different paths 
        // to the same file use the same lock
        File canonicalDir = dir.getCanonicalFile();
        if (!canonicalDir.exists() || !canonicalDir.isDirectory()) {
            throw new FileNotFoundException(canonicalDir.getName());
        }
        return dirLocks.computeIfAbsent(canonicalDir, d -> new ReentrantLock());
    }
}

感谢@teppic 和@OlegSklyar 的指导 最后这是完整的工作示例,

FolderImpl -> 具有可被多个线程访问的方法名称调用

我使用了 ConcurrentHashMap(读可以非常快,而写是用锁完成的。)它比 synchronizedMap 快一点点 它将保存文件夹名称和 ReentrantLock,因此锁定将对文件夹名称起作用

public class FolderImpl {
    private FolderImpl(){
        System.out.println("init................");
    }

    private ConcurrentHashMap<String, ReentrantLock> concurrentHashMap= new ConcurrentHashMap();
    private static final FolderImpl singleTon = new FolderImpl();
    public static FolderImpl getSingleTon() {
        return singleTon;
    }

    public void call(String name) throws Exception{
        ReentrantLock getDirLock = getDirLock(name);
        getDirLock.lock();
        try {
        for (int i = 0; i < 100; i ++) {
            System.out.println(i+":"+name+":"+Thread.currentThread().getName());
            try {
                Thread.sleep(30);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }}finally {
            getDirLock.unlock();
        }

    }

    public ReentrantLock getDirLock(String site)  {
        return concurrentHashMap.computeIfAbsent(site, d -> new ReentrantLock());
    }
}

TaskCaller线程调用call方法,这里是sleep flavor所以另一个线程可以git执行时间

public class TaskCaller extends Thread{
    public FolderImpl singleTon = FolderImpl.getSingleTon();
    public TaskCaller(String name) {
        super();
        this.name = name;
    }

    private String name;
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println("Name:"+Thread.currentThread().getName());

            try {
                singleTon.call(name);
                sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

TestExecution class 将执行 10 个线程进行测试

public class TestExecution {

    public static void main(String[] args) {
        TaskCaller testThreadCC = new TaskCaller("A_FOLDER");
        TaskCaller testThreadCC2 = new TaskCaller("A_FOLDER");
        TaskCaller testThreadCC3 = new TaskCaller("B_FOLDER");
        TaskCaller testThreadCC4 = new TaskCaller("C_FOLDER");
        TaskCaller testThreadCC5 = new TaskCaller("C_FOLDER");
        TaskCaller testThreadCC6 = new TaskCaller("C_FOLDER");
        TaskCaller testThreadCC7 = new TaskCaller("A_FOLDER");
        TaskCaller testThreadCC8 = new TaskCaller("A_FOLDER");
        TaskCaller testThreadCC9 = new TaskCaller("B_FOLDER");
        TaskCaller testThreadCC10 = new TaskCaller("B_FOLDER");

        testThreadCC.start();
        testThreadCC2.start();
        testThreadCC3.start();
        testThreadCC4.start();
        testThreadCC5.start();
        testThreadCC6.start();
        testThreadCC7.start();
        testThreadCC8.start();
        testThreadCC9.start();
        testThreadCC10.start();

    }

}