应该如何处理线程锁以保持其他线程等待直到下载文件,然后允许所有线程一次性读取文件

How thread lock should be handled to keep other threads waiting until downloading a file and then allow all threads to read the file in one go

我正在使用 ExecutorService fixedThreadPool() 来 运行 TASK.

A TASK 这里定义为从特定的 URL 下载文件,如果不存在则保存到数据库,否则从中读取文件仅数据库。所以它更像是一个 reader-writer 问题,其中执行程序线程池的任何线程都可以充当一次 writer,而其他线程将成为后续请求的 reader。

我正在使用信号量来执行此操作,但此方法的问题是后续读取请求是按顺序发生的。

如果 4 TASKs 打算命中相同的 URL 我需要同步直到文件被下载并且信号量被释放,即在 4 个线程中任何人可以获得锁,其余 3 个正在等待。下载完成后,所有剩余的 3 个线程应同时读取下载的文件。但在我的案例中,这最后一步是按顺序发生的,这也会对项目绩效产生影响。

说了上面的用例,下面是我的示例代码:

下面的 Runnable 被传递给 ExecutorService 以在 SharedObject class.

上执行任务
 class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
    void run() {
        sharedObject.loadFile(url);
    }
 }
class SharedObjectMT {
    // This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
        // multiple threads requesting for the same URL will only be synchronized on their
        // corresponding semaphore. And threads requesting for different URLs 
        // will run concurrently.

    private static HashMap<String, Semaphore> syncMap = new HashMap<>();
    .....
    void loadFile(String url) {
        
        // Let all threads enter sequentially and try to assign a Semaphore for their url in the 
        // hashmap. If the url has never been requested, then only a Semaphore(say S1) will be 
        // assigned to that url. And for all the other threads with *same request url*, this 
        // Semaphore(S1) will be used to handle concurrency.

        synchronized(syncMap) {
             if(syncMap[url] == null) {
                syncMap[url] = new Semaphore(1);
            }
        }
        
        Semaphore semaphore = syncMap[url];

        synchronized(semaphore) {
            ............
            ............
            semaphore.acquire();
            String filePath = findInDatabase(url);
            if(filePath != null) {
                semaphore.release(); // no need to hold semaphore since file already downloaded.
                printStatus("Already downloaded file from url = "+url);
            } else {
                // This DownloadThread is actually a mock of my real project where a third-party 
                // library uses a thread to download the file.

                DownloadThread(() -> {
                    printStatus("Download completed for url= "+ url +". Releasing semaphore.");
                    semaphore.release();
                }).start();
                .............
                .............
            }
        }
    }
}

我知道单个信号量帮不了我。也许我们可以使用 1 个信号量来区分读写锁或任何其他锁定机制。因此需要一些帮助来了解如何使用这种类型的一次性同步

Note: Please ignore if you find any syntax error in the above code since the actual project is in Kotlin but this is a basic Java multithreading problem so I posted it as a Java code.

我对 Kotlin 不太了解,但我可以在 Java:

中演示
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class DownloadOrRead {
        
    //Utility method, which just generates a random String instance...
    private static String randomString(final int length) {
        String alphabet = "abcdefghijklmnopqrstuvwxyz";
        alphabet += alphabet.toUpperCase();
        alphabet += "0123456789";
        final int alphabetSize = alphabet.length();
        final char[] chars = new char[length];
        final Random rand = new Random();
        for (int i = 0; i < chars.length; ++i)
            chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
        return String.valueOf(chars);
    }
    
    public static class DownLoadCallable implements Callable<String> {
        private final String url;
        
        public DownLoadCallable(final String url) {
            this.url = Objects.requireNonNull(url);
        }
        
        @Override
        public String call() throws IOException, InterruptedException {
            
            /*Utilize url property here to download the file...
            In our case, just simulate a download delay supposedly...*/
            Thread.sleep(5000L + (long) (Math.random() * 10000L));
            
            //Return the file's local path...
            return randomString(20); //In our case, a random String of 20 characters.
        }
    }
    
    //This is the method you are looking for:
    public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
                                  final HashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
                                  final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
            throws InterruptedException, ExecutionException {
        final Future<String> future;
        synchronized (urlToFuture) {
            if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
                urlToFuture.put(url, executorService.submit(new DownLoadCallable(url))); //Create a Future...
            future = urlToFuture.get(url); //Obtain the Future (new or old).
        }
        return future.get(); //Outside the synchronized block!
    }
    
    public static void main(final String[] args) {
        
        System.out.println("Creating ExecutorService...");
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        System.out.println("Creating shared map...");
        final HashMap<String, Future<String>> urlToFuture = new HashMap<>();
        
        System.out.println("Creating random URLs...");
        final String[] urls = new String[]{randomString(10), randomString(20), randomString(15)};
        
        try {
            System.out.println("Downloading files sequencially...");
            final Random rand = new Random();
            for (int i = 0; i < 100; ++i)
                System.out.println(loadPath(executorService, urlToFuture, urls[rand.nextInt(urls.length)]));
            
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.MINUTES);
        }
        catch (final InterruptedException | ExecutionException x) {
            System.err.println(x);
        }
    }
}

整个想法是将 Callable 提交给处理下载的 ExecutorService。我们还利用 submit 方法返回的 Futures 来获得所需的结果 path/file/anything。只需在所需的 Future 对象上调用 get 即可。您唯一需要同步的是 URL 到 FutureMap

您会注意到,当 运行 这个测试程序时,第一个文件在下载之前一直处于阻塞状态,然后对相同 URL 的后续调用立即完成(因为 URL已经下载)并且我们只阻止每个新的 URL(尚未下载)。在这种情况下,我只使用了 3 个随机 URLs,每个 URL 需要 5 到 15 秒才能完成,这给了我们大约 15 到 45 秒的正常运行时间,因为我们按顺序下载它们。

loadPath 方法到此结束。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个 Thread 用于下载,您可以从多个 Thread 调用 loadPath(除了共享的 Map 之外不需要在其他地方进一步同步)。

正如在 this answer here 中所读到的,似乎在操作完成后调用相同 Futureget 方法,将始终产生相同的对象或抛出相同的对象Exception 如果失败。这是我们在 post.

提供的代码中使用的优势

编辑 1:

或者更好,正如@drekbour 在评论中指出的那样,使用 computeIfAbsentConcurrentHashMap 来完成工作,如下所示:

import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class DownloadOrRead1 {
    
    //Utility method, which just generates a random String instance...
    private static String randomString(final int length) {
        String alphabet = "abcdefghijklmnopqrstuvwxyz";
        alphabet += alphabet.toUpperCase();
        alphabet += "0123456789";
        final int alphabetSize = alphabet.length();
        final char[] chars = new char[length];
        final Random rand = new Random();
        for (int i = 0; i < chars.length; ++i)
            chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
        return String.valueOf(chars);
    }
    
    public static class DownLoadCallable implements Callable<String> {
        private final String url;
        
        public DownLoadCallable(final String url) {
            this.url = Objects.requireNonNull(url);
        }
        
        @Override
        public String call() throws InterruptedException {
            
            System.out.println("Downloading " + url + "...");
            
            /*Utilize url property here to download the file...
            In our case, just simulate a download delay supposedly...*/
            Thread.sleep(5000L + (long) (Math.random() * 10000L));
            
            System.out.println("Downloaded " + url + '.');
            
            //Return the file's local path...
            return randomString(20); //In our case, a random String of 20 characters.
        }
    }
    
    //This is the method you are looking for:
    public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
                                  final ConcurrentHashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
                                  final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
            throws InterruptedException, ExecutionException {
        return urlToFuture.computeIfAbsent(url, url2 -> executorService.submit(new DownLoadCallable(url2))).get();
    }
    
    public static void main(final String[] args) {
        
        System.out.println("Creating ExecutorService...");
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        System.out.println("Creating shared Map...");
        final ConcurrentHashMap<String, Future<String>> urlToFuture = new ConcurrentHashMap<>();
        
        System.out.println("Creating random URLs...");
        final String[] urls = new String[]{randomString(10), randomString(10), randomString(10)};
        
        try {
            System.out.println("Downloading files sequencially...");
            final Random rand = new Random();
            for (int i = 0; i < 100; ++i) {
                final String url = urls[rand.nextInt(urls.length)];
                System.out.println("Path for " + url + ": " + loadPath(executorService, urlToFuture, url));
            }
            
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.MINUTES);
        }
        catch (final InterruptedException | ExecutionException x) {
            System.err.println(x);
        }
    }
}