应该如何处理线程锁以保持其他线程等待直到下载文件,然后允许所有线程一次性读取文件
How thread lock should be handled to keep other threads waiting until downloading a file and then allow all threads to read the file in one go
我正在使用 ExecutorService fixedThreadPool() 来 运行 TASK.
A TASK 这里定义为从特定的 URL 下载文件,如果不存在则保存到数据库,否则从中读取文件仅数据库。所以它更像是一个 reader-writer 问题,其中执行程序线程池的任何线程都可以充当一次 writer,而其他线程将成为后续请求的 reader。
我正在使用信号量来执行此操作,但此方法的问题是后续读取请求是按顺序发生的。
如果 4 TASKs 打算命中相同的 URL 我需要同步直到文件被下载并且信号量被释放,即在 4 个线程中任何人可以获得锁,其余 3 个正在等待。下载完成后,所有剩余的 3 个线程应同时读取下载的文件。但在我的案例中,这最后一步是按顺序发生的,这也会对项目绩效产生影响。
说了上面的用例,下面是我的示例代码:
下面的 Runnable 被传递给 ExecutorService 以在 SharedObject class.
上执行任务
class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
void run() {
sharedObject.loadFile(url);
}
}
class SharedObjectMT {
// This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
// multiple threads requesting for the same URL will only be synchronized on their
// corresponding semaphore. And threads requesting for different URLs
// will run concurrently.
private static HashMap<String, Semaphore> syncMap = new HashMap<>();
.....
void loadFile(String url) {
// Let all threads enter sequentially and try to assign a Semaphore for their url in the
// hashmap. If the url has never been requested, then only a Semaphore(say S1) will be
// assigned to that url. And for all the other threads with *same request url*, this
// Semaphore(S1) will be used to handle concurrency.
synchronized(syncMap) {
if(syncMap[url] == null) {
syncMap[url] = new Semaphore(1);
}
}
Semaphore semaphore = syncMap[url];
synchronized(semaphore) {
............
............
semaphore.acquire();
String filePath = findInDatabase(url);
if(filePath != null) {
semaphore.release(); // no need to hold semaphore since file already downloaded.
printStatus("Already downloaded file from url = "+url);
} else {
// This DownloadThread is actually a mock of my real project where a third-party
// library uses a thread to download the file.
DownloadThread(() -> {
printStatus("Download completed for url= "+ url +". Releasing semaphore.");
semaphore.release();
}).start();
.............
.............
}
}
}
}
我知道单个信号量帮不了我。也许我们可以使用 1 个信号量来区分读写锁或任何其他锁定机制。因此需要一些帮助来了解如何使用这种类型的一次性同步。
Note: Please ignore if you find any syntax error in the above code since the actual project is in Kotlin but this is a basic Java multithreading problem so I posted it as a Java code.
我对 Kotlin 不太了解,但我可以在 Java:
中演示
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws IOException, InterruptedException {
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final HashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
final Future<String> future;
synchronized (urlToFuture) {
if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
urlToFuture.put(url, executorService.submit(new DownLoadCallable(url))); //Create a Future...
future = urlToFuture.get(url); //Obtain the Future (new or old).
}
return future.get(); //Outside the synchronized block!
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared map...");
final HashMap<String, Future<String>> urlToFuture = new HashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(20), randomString(15)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i)
System.out.println(loadPath(executorService, urlToFuture, urls[rand.nextInt(urls.length)]));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
整个想法是将 Callable
提交给处理下载的 ExecutorService
。我们还利用 submit
方法返回的 Future
s 来获得所需的结果 path/file/anything。只需在所需的 Future
对象上调用 get
即可。您唯一需要同步的是 URL 到 Future
的 Map
。
您会注意到,当 运行 这个测试程序时,第一个文件在下载之前一直处于阻塞状态,然后对相同 URL 的后续调用立即完成(因为 URL已经下载)并且我们只阻止每个新的 URL(尚未下载)。在这种情况下,我只使用了 3 个随机 URLs,每个 URL 需要 5 到 15 秒才能完成,这给了我们大约 15 到 45 秒的正常运行时间,因为我们按顺序下载它们。
loadPath
方法到此结束。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个 Thread
用于下载,您可以从多个 Thread
调用 loadPath
(除了共享的 Map
之外不需要在其他地方进一步同步)。
正如在 this answer here 中所读到的,似乎在操作完成后调用相同 Future
的 get
方法,将始终产生相同的对象或抛出相同的对象Exception
如果失败。这是我们在 post.
提供的代码中使用的优势
编辑 1:
或者更好,正如@drekbour 在评论中指出的那样,使用 computeIfAbsent
和 ConcurrentHashMap
来完成工作,如下所示:
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead1 {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws InterruptedException {
System.out.println("Downloading " + url + "...");
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
System.out.println("Downloaded " + url + '.');
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final ConcurrentHashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
return urlToFuture.computeIfAbsent(url, url2 -> executorService.submit(new DownLoadCallable(url2))).get();
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared Map...");
final ConcurrentHashMap<String, Future<String>> urlToFuture = new ConcurrentHashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(10), randomString(10)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i) {
final String url = urls[rand.nextInt(urls.length)];
System.out.println("Path for " + url + ": " + loadPath(executorService, urlToFuture, url));
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
我正在使用 ExecutorService fixedThreadPool() 来 运行 TASK.
A TASK 这里定义为从特定的 URL 下载文件,如果不存在则保存到数据库,否则从中读取文件仅数据库。所以它更像是一个 reader-writer 问题,其中执行程序线程池的任何线程都可以充当一次 writer,而其他线程将成为后续请求的 reader。
我正在使用信号量来执行此操作,但此方法的问题是后续读取请求是按顺序发生的。
如果 4 TASKs 打算命中相同的 URL 我需要同步直到文件被下载并且信号量被释放,即在 4 个线程中任何人可以获得锁,其余 3 个正在等待。下载完成后,所有剩余的 3 个线程应同时读取下载的文件。但在我的案例中,这最后一步是按顺序发生的,这也会对项目绩效产生影响。
说了上面的用例,下面是我的示例代码:
下面的 Runnable 被传递给 ExecutorService 以在 SharedObject class.
上执行任务 class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
void run() {
sharedObject.loadFile(url);
}
}
class SharedObjectMT {
// This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
// multiple threads requesting for the same URL will only be synchronized on their
// corresponding semaphore. And threads requesting for different URLs
// will run concurrently.
private static HashMap<String, Semaphore> syncMap = new HashMap<>();
.....
void loadFile(String url) {
// Let all threads enter sequentially and try to assign a Semaphore for their url in the
// hashmap. If the url has never been requested, then only a Semaphore(say S1) will be
// assigned to that url. And for all the other threads with *same request url*, this
// Semaphore(S1) will be used to handle concurrency.
synchronized(syncMap) {
if(syncMap[url] == null) {
syncMap[url] = new Semaphore(1);
}
}
Semaphore semaphore = syncMap[url];
synchronized(semaphore) {
............
............
semaphore.acquire();
String filePath = findInDatabase(url);
if(filePath != null) {
semaphore.release(); // no need to hold semaphore since file already downloaded.
printStatus("Already downloaded file from url = "+url);
} else {
// This DownloadThread is actually a mock of my real project where a third-party
// library uses a thread to download the file.
DownloadThread(() -> {
printStatus("Download completed for url= "+ url +". Releasing semaphore.");
semaphore.release();
}).start();
.............
.............
}
}
}
}
我知道单个信号量帮不了我。也许我们可以使用 1 个信号量来区分读写锁或任何其他锁定机制。因此需要一些帮助来了解如何使用这种类型的一次性同步。
Note: Please ignore if you find any syntax error in the above code since the actual project is in Kotlin but this is a basic Java multithreading problem so I posted it as a Java code.
我对 Kotlin 不太了解,但我可以在 Java:
中演示import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws IOException, InterruptedException {
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final HashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
final Future<String> future;
synchronized (urlToFuture) {
if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
urlToFuture.put(url, executorService.submit(new DownLoadCallable(url))); //Create a Future...
future = urlToFuture.get(url); //Obtain the Future (new or old).
}
return future.get(); //Outside the synchronized block!
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared map...");
final HashMap<String, Future<String>> urlToFuture = new HashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(20), randomString(15)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i)
System.out.println(loadPath(executorService, urlToFuture, urls[rand.nextInt(urls.length)]));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
整个想法是将 Callable
提交给处理下载的 ExecutorService
。我们还利用 submit
方法返回的 Future
s 来获得所需的结果 path/file/anything。只需在所需的 Future
对象上调用 get
即可。您唯一需要同步的是 URL 到 Future
的 Map
。
您会注意到,当 运行 这个测试程序时,第一个文件在下载之前一直处于阻塞状态,然后对相同 URL 的后续调用立即完成(因为 URL已经下载)并且我们只阻止每个新的 URL(尚未下载)。在这种情况下,我只使用了 3 个随机 URLs,每个 URL 需要 5 到 15 秒才能完成,这给了我们大约 15 到 45 秒的正常运行时间,因为我们按顺序下载它们。
loadPath
方法到此结束。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个 Thread
用于下载,您可以从多个 Thread
调用 loadPath
(除了共享的 Map
之外不需要在其他地方进一步同步)。
正如在 this answer here 中所读到的,似乎在操作完成后调用相同 Future
的 get
方法,将始终产生相同的对象或抛出相同的对象Exception
如果失败。这是我们在 post.
编辑 1:
或者更好,正如@drekbour 在评论中指出的那样,使用 computeIfAbsent
和 ConcurrentHashMap
来完成工作,如下所示:
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead1 {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws InterruptedException {
System.out.println("Downloading " + url + "...");
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
System.out.println("Downloaded " + url + '.');
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final ConcurrentHashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
return urlToFuture.computeIfAbsent(url, url2 -> executorService.submit(new DownLoadCallable(url2))).get();
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared Map...");
final ConcurrentHashMap<String, Future<String>> urlToFuture = new ConcurrentHashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(10), randomString(10)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i) {
final String url = urls[rand.nextInt(urls.length)];
System.out.println("Path for " + url + ": " + loadPath(executorService, urlToFuture, url));
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}