多线程处理多个pdf文件
Multi threading multiple pdf files
所以我正在尝试 运行 多个 PDF 文件通过一个函数来抓取文本,将它与静态字典进行比较,然后将它的关系数据添加到 table 中的索引 MYSQL。我研究了多线程,但不确定这是否能达到我的需要。
这是我遍历所有 PDF 文件的 for 循环
for(String temp: files){
//addToDict(temp,dictonary,conn);
//new Scraper(temp,dictonary,conn).run();
Scraper obj=new Scraper(temp,dictonary,conn);
Thread T1 =new Thread(obj);
T1.start();
//System.out.println((ammountOfFiles--)+" files left");
}
这是我创建的实现 运行nable
的 Scraper class
public class Scraper implements Runnable {
private String filePath;
private HashMap<String,Integer> map;
private Connection conn;
public Scraper(String file_path,HashMap<String,Integer> dict,Connection connection) {
// store parameter for later user
filePath =file_path;
map = dict;
conn = connection;
}
@Override
public void run() {
//cut file path so it starts from the data folder
int cutPos = filePath.indexOf("Data");
String cutPath = filePath.substring(cutPos);
cutPath = cutPath.replaceAll("\\", "|");
System.out.println(cutPath+" being scrapped");
// Queries
String addSentanceQuery ="INSERT INTO sentance(sentance_ID,sentance_Value) VALUES(Default,?)";
String addContextQuery ="INSERT INTO context(context_ID,word_ID,sentance_ID,pdf_path) VALUES(Default,?,?,?)";
// Prepared Statementes
// RESULT SETS
ResultSet sentanceKeyRS=null;
BodyContentHandler handler = new BodyContentHandler(-1);
Metadata metadata = new Metadata();
FileInputStream inputstream = null;
try {
inputstream = new FileInputStream(new File(filePath));
} catch (FileNotFoundException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
ParseContext pcontext = new ParseContext();
//parsing the document using PDF parser
PDFParser pdfparser = new PDFParser();
try {
pdfparser.parse(inputstream, handler, metadata, pcontext);
} catch (IOException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
} catch (SAXException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
} catch (TikaException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
//getting the content of the document
String fileText = handler.toString();
fileText = fileText.toLowerCase();
//spilt text by new line
String sentances [] = fileText.split("\n");
for(String x : sentances){
x = x.trim();
if(x.isEmpty() || x.matches("\t+") || x.matches("\n+") || x.matches("")){
}else{
int sentanceID = 0;
//add sentance to db and get the id
try (PreparedStatement addSentancePrepare = conn.prepareStatement(addSentanceQuery,Statement.RETURN_GENERATED_KEYS)) {
addSentancePrepare.setString(1, x);
addSentancePrepare.executeUpdate();
sentanceKeyRS = addSentancePrepare.getGeneratedKeys();
while (sentanceKeyRS.next()) {
sentanceID = sentanceKeyRS.getInt(1);
}
addSentancePrepare.close();
sentanceKeyRS.close();
} catch (SQLException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
String words [] = x.split(" ");
for(String y : words){
y = y.trim();
if(y.matches("\s+") || y.matches("")){
}else if(map.containsKey(y)){
//get ID and put in middle table
try (PreparedStatement addContextPrepare = conn.prepareStatement(addContextQuery)) {
addContextPrepare.setInt(1, map.get(y));
addContextPrepare.setInt(2, sentanceID);
addContextPrepare.setString(3, cutPath);
addContextPrepare.executeUpdate();
addContextPrepare.close();
} catch (SQLException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
}
try {
inputstream.close();
} catch (IOException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我做的正确吗?我从未使用过多线程,但它似乎可以加快我的程序速度。
您完成了程序的基本建模。从概念上讲,您几乎是正确的。不过很少有人担心。
- 可扩展性
您无法在处理更多文件时增加线程数。尽管增加并发工作人员的数量应该会像我们认为的那样提高性能,但在现实世界中情况可能并非如此。当线程数量增加超过一定水平(取决于各种参数)时,性能实际上会下降。(由于线程争用、通信、内存使用)。所以我建议您使用 java concurrent
包中附带的 ThreadPool
实现。参考下面我对你的代码做的修改。
public class Test {
private final ThreadPoolExecutor threadPoolExecutor;
public Test(int coreSize, int maxSize) {
this.threadPoolExecutor = new ThreadPoolExecutor(coreSize,maxSize, 50, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
}
public void submit(String[] files) {
for(String temp: files){
//addToDict(temp,dictonary,conn);
//new Scraper(temp,dictonary,conn).run();
Scraper obj=new Scraper(temp,dictonary,conn);
threadPoolExecutor.submit(obj);
//System.out.println((ammountOfFiles--)+" files left");
}
}
public void shutDown() {
this.threadPoolExecutor.shutdown();
}
}
- 线程安全和同步
我可以看到您已经跨线程共享了
java.sql.Connection
实例。尽管 java.sql.Connection
是线程安全的,但这种用法会显着降低您的应用程序性能,因为 java.sql.Connection
通过同步实现线程安全。因此一次只有一个线程能够使用该连接。为了克服这个问题,我们可以使用 Connection Pooling
概念。我可以建议的一个简单实现是 Apache Commons dbcp
所以我正在尝试 运行 多个 PDF 文件通过一个函数来抓取文本,将它与静态字典进行比较,然后将它的关系数据添加到 table 中的索引 MYSQL。我研究了多线程,但不确定这是否能达到我的需要。
这是我遍历所有 PDF 文件的 for 循环
for(String temp: files){
//addToDict(temp,dictonary,conn);
//new Scraper(temp,dictonary,conn).run();
Scraper obj=new Scraper(temp,dictonary,conn);
Thread T1 =new Thread(obj);
T1.start();
//System.out.println((ammountOfFiles--)+" files left");
}
这是我创建的实现 运行nable
的 Scraper classpublic class Scraper implements Runnable {
private String filePath;
private HashMap<String,Integer> map;
private Connection conn;
public Scraper(String file_path,HashMap<String,Integer> dict,Connection connection) {
// store parameter for later user
filePath =file_path;
map = dict;
conn = connection;
}
@Override
public void run() {
//cut file path so it starts from the data folder
int cutPos = filePath.indexOf("Data");
String cutPath = filePath.substring(cutPos);
cutPath = cutPath.replaceAll("\\", "|");
System.out.println(cutPath+" being scrapped");
// Queries
String addSentanceQuery ="INSERT INTO sentance(sentance_ID,sentance_Value) VALUES(Default,?)";
String addContextQuery ="INSERT INTO context(context_ID,word_ID,sentance_ID,pdf_path) VALUES(Default,?,?,?)";
// Prepared Statementes
// RESULT SETS
ResultSet sentanceKeyRS=null;
BodyContentHandler handler = new BodyContentHandler(-1);
Metadata metadata = new Metadata();
FileInputStream inputstream = null;
try {
inputstream = new FileInputStream(new File(filePath));
} catch (FileNotFoundException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
ParseContext pcontext = new ParseContext();
//parsing the document using PDF parser
PDFParser pdfparser = new PDFParser();
try {
pdfparser.parse(inputstream, handler, metadata, pcontext);
} catch (IOException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
} catch (SAXException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
} catch (TikaException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
//getting the content of the document
String fileText = handler.toString();
fileText = fileText.toLowerCase();
//spilt text by new line
String sentances [] = fileText.split("\n");
for(String x : sentances){
x = x.trim();
if(x.isEmpty() || x.matches("\t+") || x.matches("\n+") || x.matches("")){
}else{
int sentanceID = 0;
//add sentance to db and get the id
try (PreparedStatement addSentancePrepare = conn.prepareStatement(addSentanceQuery,Statement.RETURN_GENERATED_KEYS)) {
addSentancePrepare.setString(1, x);
addSentancePrepare.executeUpdate();
sentanceKeyRS = addSentancePrepare.getGeneratedKeys();
while (sentanceKeyRS.next()) {
sentanceID = sentanceKeyRS.getInt(1);
}
addSentancePrepare.close();
sentanceKeyRS.close();
} catch (SQLException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
String words [] = x.split(" ");
for(String y : words){
y = y.trim();
if(y.matches("\s+") || y.matches("")){
}else if(map.containsKey(y)){
//get ID and put in middle table
try (PreparedStatement addContextPrepare = conn.prepareStatement(addContextQuery)) {
addContextPrepare.setInt(1, map.get(y));
addContextPrepare.setInt(2, sentanceID);
addContextPrepare.setString(3, cutPath);
addContextPrepare.executeUpdate();
addContextPrepare.close();
} catch (SQLException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
}
try {
inputstream.close();
} catch (IOException ex) {
Logger.getLogger(Scraper.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
我做的正确吗?我从未使用过多线程,但它似乎可以加快我的程序速度。
您完成了程序的基本建模。从概念上讲,您几乎是正确的。不过很少有人担心。
- 可扩展性
您无法在处理更多文件时增加线程数。尽管增加并发工作人员的数量应该会像我们认为的那样提高性能,但在现实世界中情况可能并非如此。当线程数量增加超过一定水平(取决于各种参数)时,性能实际上会下降。(由于线程争用、通信、内存使用)。所以我建议您使用 java concurrent
包中附带的 ThreadPool
实现。参考下面我对你的代码做的修改。
public class Test {
private final ThreadPoolExecutor threadPoolExecutor;
public Test(int coreSize, int maxSize) {
this.threadPoolExecutor = new ThreadPoolExecutor(coreSize,maxSize, 50, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
}
public void submit(String[] files) {
for(String temp: files){
//addToDict(temp,dictonary,conn);
//new Scraper(temp,dictonary,conn).run();
Scraper obj=new Scraper(temp,dictonary,conn);
threadPoolExecutor.submit(obj);
//System.out.println((ammountOfFiles--)+" files left");
}
}
public void shutDown() {
this.threadPoolExecutor.shutdown();
}
}
- 线程安全和同步
我可以看到您已经跨线程共享了
java.sql.Connection
实例。尽管java.sql.Connection
是线程安全的,但这种用法会显着降低您的应用程序性能,因为java.sql.Connection
通过同步实现线程安全。因此一次只有一个线程能够使用该连接。为了克服这个问题,我们可以使用Connection Pooling
概念。我可以建议的一个简单实现是 Apache Commons dbcp