多线程从文件中读取并执行runnable
Multi-threaded read from file and execute runnable
什么?
我正在尝试构建一个工具来读取文本文件并在进行一些字符串转换后发布文本。
怎么做?
该工具逐行读取文件并填充 LinkedBlockingQueue。同时我启动多个线程,然后每个线程从 LBQ 中获取一条消息,进行一些处理并发布它们。
主要
private static LinkedBlockingQueue<String> lbQueue = new LinkedBlockingQueue<>();
private static Boolean keepPublisherActive = Boolean.TRUE;
public static void main(String[] args) {
try {
tool.initMessagePublish();
tool.searchUsingScanner();
} catch (Exception ex) {
logger.error("Exception in Tool Main() " + ex.toString());
throw ex;
}
}
文件Reader
private void searchUsingScanner() {
Scanner scanner = null;
try {
scanner = new Scanner(new File(LOG_FILE_PATH));
while (scanner.hasNextLine()) {
String line = scanner.nextLine().trim();
if (StringUtils.isNotBlank(line)) {
lbQueue.offer(line);
}
}
} catch (Exception e) {
logger.error("Error while processing file: " + e.toString());
} finally {
try {
if (scanner != null) {
scanner.close();
}
// end thread execution
keepPublisherActive = false;
} catch (Exception e) {
logger.error("Exception while closing file scanner " + e.toString());
throw e;
}
}
}
多线程发布器
private void initMessagePublish() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
try {
while (keepPublisherActive || lbQueue.getSize() > 0) {
service.execute(messagePublisher); // messagePublisher implements Runnable
}
} catch (Exception ex) {
logger.error("Multi threaded message publish failed " + ex.toString());
throw ex;
} finally {
service.shutdown();
}
}
问题
首先调用 initMessagePublish() 的目的是发布者无需等待从文件中读取所有行就可以开始发布。一旦 LBQ 中有可用内容,它就会开始发布。
但是对于当前的实现,控件永远不会从 initMessagePublish 中出来并启动 searchUsingScanner。我该如何解决这个问题?基本上,这两个方法应该并行执行。
只需在新线程中启动 messagePublisher(Main class 中的第 5 行):
new Thread(()->tool.initMessagePublish()).start();
应该可以解决您的问题。
什么?
我正在尝试构建一个工具来读取文本文件并在进行一些字符串转换后发布文本。
怎么做?
该工具逐行读取文件并填充 LinkedBlockingQueue。同时我启动多个线程,然后每个线程从 LBQ 中获取一条消息,进行一些处理并发布它们。
主要
private static LinkedBlockingQueue<String> lbQueue = new LinkedBlockingQueue<>();
private static Boolean keepPublisherActive = Boolean.TRUE;
public static void main(String[] args) {
try {
tool.initMessagePublish();
tool.searchUsingScanner();
} catch (Exception ex) {
logger.error("Exception in Tool Main() " + ex.toString());
throw ex;
}
}
文件Reader
private void searchUsingScanner() {
Scanner scanner = null;
try {
scanner = new Scanner(new File(LOG_FILE_PATH));
while (scanner.hasNextLine()) {
String line = scanner.nextLine().trim();
if (StringUtils.isNotBlank(line)) {
lbQueue.offer(line);
}
}
} catch (Exception e) {
logger.error("Error while processing file: " + e.toString());
} finally {
try {
if (scanner != null) {
scanner.close();
}
// end thread execution
keepPublisherActive = false;
} catch (Exception e) {
logger.error("Exception while closing file scanner " + e.toString());
throw e;
}
}
}
多线程发布器
private void initMessagePublish() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
try {
while (keepPublisherActive || lbQueue.getSize() > 0) {
service.execute(messagePublisher); // messagePublisher implements Runnable
}
} catch (Exception ex) {
logger.error("Multi threaded message publish failed " + ex.toString());
throw ex;
} finally {
service.shutdown();
}
}
问题
首先调用 initMessagePublish() 的目的是发布者无需等待从文件中读取所有行就可以开始发布。一旦 LBQ 中有可用内容,它就会开始发布。
但是对于当前的实现,控件永远不会从 initMessagePublish 中出来并启动 searchUsingScanner。我该如何解决这个问题?基本上,这两个方法应该并行执行。
只需在新线程中启动 messagePublisher(Main class 中的第 5 行):
new Thread(()->tool.initMessagePublish()).start();
应该可以解决您的问题。