Java线程池复杂约束数据处理
Java thread pooled complex constraints data processing
我有 Java EE(桌面)应用程序,它必须处理由多个来源(最多 200 个不同来源)生成的数据文件。每个来源定期生成 具有唯一名称 的数据文件,其中还包含该来源的唯一 ID。
我需要创建一个包含 15 个线程的线程池,用于处理和删除具有这些约束的文件:
- 多线程无法同时处理来自同一源的文件。
- 来自同一来源的多个文件应按照其创建时间戳的顺序进行处理。
- 不可能与文件生成器源同步,因此这意味着下一个文件可能由源生成,同时它的前一个文件正在处理或计划处理。
- 由于性能原因应该多线程处理(单线程处理不够,所以我打算使用 10-15 个线程)。
- 一次文件处理操作可能耗时3-15秒。
欢迎就池中线程的这种复杂同步的体系结构提出任何建议。
P.S。由于同时处理的限制,我之前在更简单的情况下使用的设计,即使用 ArrayBlockingQueue 不适合这种情况。
总体思路:
每个源都有一个任务队列。
你有一个中央队列,它实际上是一个由所有工作线程共享的任务队列队列。
为每个来源创建一个任务队列。并且您将这些任务队列放在基于唯一 ID 的哈希表中。通过这种方式,您可以保证按顺序处理来自同一来源的任务(要求 2)。
如果收到任务,则在哈希表中查找(或创建)任务队列并将任务添加到任务队列。如果它是第一个添加到队列中的任务,您也将其添加到中央队列中。
然后有一堆工作线程从这个中央队列中获取任务队列,然后从他们刚刚获取的任务队列中获取单个任务并处理该任务。完成任务后,他们需要决定是否需要将任务队列重新插入中央队列。
有几个部分很容易出错:
您不希望任务队列被多次插入到中央队列中。那将违反您的第一个要求。
您不希望任务队列不被重新插入中央队列,即使任务可用也是如此。
因此您需要注意适当的同步,这可能比您最初想象的要复杂一些。但是看到任务很长 运行 的事实,我会从一个常规的互斥体开始(可能是每个任务队列),例如synchronized 或锁,不用担心让它成为非阻塞。
所以这是我为解决我的问题而创建的 class 的 skeleton/tester。有关这里发生的事情的一些详细信息,请参阅@pveentjer 的回答。
package org.zur.test;
import java.io.File;
import java.util.Comparator;
import java.util.HashMap;
import java.util.PriorityQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PmFileProcessor {
private static final int THREADS_COUNT = 15;
ArrayBlockingQueue<File> scheduledFiles = new ArrayBlockingQueue<>(10000, false);
HashMap<String, PmFileProcessingJob> allJobs = new HashMap<>();
ScheduledExecutorService jobsExecutorService = Executors.newScheduledThreadPool(THREADS_COUNT);
public PmFileProcessor() {
super();
SourceJobsManager fc = new SourceJobsManager();
fc.setDaemon(true);
fc.start();
}
public void scheduleFile(File f) {
try {
scheduledFiles.add(f);
} catch (Exception e) {
// TODO: handle exception
}
}
/**
* Assigns files to file source processing job.
*
* @author
* <ul>
* <li>Zur13</li>
* </ul>
*
*/
public class SourceJobsManager extends Thread {
@Override
public void run() {
// assigns scheduled files to per-source jobs and schedules job for additional execution
while ( true ) {
try {
File f = scheduledFiles.take();
PmFileProcessingJob job = getSourceJob(f);
job.scheduleSourceFile(f);
jobsExecutorService.execute(job); // schedules job execution
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO: check disk space periodically
}
}
/**
* Finds existing job for file source or creates a new one.
*
* @param f
* @return
*/
private PmFileProcessingJob getSourceJob(File f) {
// TODO: test code
String fname = f.getName();
String[] parts = fname.split("_");
String uid = parts[0];
PmFileProcessingJob res = allJobs.get(uid);
if ( res == null ) {
res = new PmFileProcessingJob(uid);
allJobs.put(uid, res);
}
return res;
}
}
/**
* Process first file from scheduledSourceFiles queue (i.e. each job execution processes a single file or
* reschedules itself for later execution if another thread already processes the file from the same source).
*
* @author
* <ul>
* <li>Zur13</li>
* </ul>
*
*/
public class PmFileProcessingJob implements Runnable {
public final String fileSourceUidString;
PriorityQueue<File> scheduledSourceFiles = new PriorityQueue<>(1000, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
// TODO Auto-generated method stub
return 0;
}
});
Semaphore onePassSemaphore = new Semaphore(1);
public PmFileProcessingJob(String fileSourceUid) {
super();
this.fileSourceUidString = fileSourceUid;
}
/**
* Schedules file from for processing by this job.
*/
public void scheduleSourceFile(File f) {
scheduledSourceFiles.add(f);
}
@Override
public void run() {
File f = null;
if ( scheduledSourceFiles.size() > 0 ) { // fail fast optimization 1
if ( onePassSemaphore.tryAcquire() ) { // fail fast optimization 2
try {
f = scheduledSourceFiles.poll();
if ( f != null ) {
// TODO: process the file
try {
System.err.println(f.getName() + "\t" + Thread.currentThread().getId());
Thread.sleep(1000);
return;
} catch (Exception e) {
// TODO: handle exception
return; // prevents reschedule loop for failing files
}
} else {
// scheduledSourceFiles queue is empty
return;
}
} finally {
onePassSemaphore.release();
}
}
if ( f == null && scheduledSourceFiles.size() > 0 ) {
// this thread did not process the scheduled file because another thread holds the critical section
// pass
// this thread should reschedule this Job to release this thread and try to process this job later
// with another thread
// reschedule the job with 4 seconds delay to prevent excess CPU usage
// System.err.println("RESCHEDULE");
jobsExecutorService.schedule(this, 3, TimeUnit.SECONDS);
}
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((this.fileSourceUidString == null) ? 0 : this.fileSourceUidString.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if ( this == obj )
return true;
if ( !(obj instanceof PmFileProcessingJob) )
return false;
PmFileProcessingJob other = (PmFileProcessingJob) obj;
if ( this.fileSourceUidString == null ) {
if ( other.fileSourceUidString != null )
return false;
} else if ( !this.fileSourceUidString.equals(other.fileSourceUidString) )
return false;
return true;
}
}
public static void main(String[] args) {
PmFileProcessor fp = new PmFileProcessor();
fp.unitTest();
}
private void unitTest() {
// TODO Auto-generated method stub
int filesCount = 1000;
for (int i = 0; i < filesCount; i++) {
int sourceUid = ThreadLocalRandom.current().nextInt(1, 30);
File f = new File(sourceUid + "_" + i);
scheduleFile(f);
}
Thread.yield();
try {
Thread.sleep(999000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我有 Java EE(桌面)应用程序,它必须处理由多个来源(最多 200 个不同来源)生成的数据文件。每个来源定期生成 具有唯一名称 的数据文件,其中还包含该来源的唯一 ID。
我需要创建一个包含 15 个线程的线程池,用于处理和删除具有这些约束的文件:
- 多线程无法同时处理来自同一源的文件。
- 来自同一来源的多个文件应按照其创建时间戳的顺序进行处理。
- 不可能与文件生成器源同步,因此这意味着下一个文件可能由源生成,同时它的前一个文件正在处理或计划处理。
- 由于性能原因应该多线程处理(单线程处理不够,所以我打算使用 10-15 个线程)。
- 一次文件处理操作可能耗时3-15秒。
欢迎就池中线程的这种复杂同步的体系结构提出任何建议。
P.S。由于同时处理的限制,我之前在更简单的情况下使用的设计,即使用 ArrayBlockingQueue 不适合这种情况。
总体思路:
每个源都有一个任务队列。
你有一个中央队列,它实际上是一个由所有工作线程共享的任务队列队列。
为每个来源创建一个任务队列。并且您将这些任务队列放在基于唯一 ID 的哈希表中。通过这种方式,您可以保证按顺序处理来自同一来源的任务(要求 2)。
如果收到任务,则在哈希表中查找(或创建)任务队列并将任务添加到任务队列。如果它是第一个添加到队列中的任务,您也将其添加到中央队列中。
然后有一堆工作线程从这个中央队列中获取任务队列,然后从他们刚刚获取的任务队列中获取单个任务并处理该任务。完成任务后,他们需要决定是否需要将任务队列重新插入中央队列。
有几个部分很容易出错:
您不希望任务队列被多次插入到中央队列中。那将违反您的第一个要求。
您不希望任务队列不被重新插入中央队列,即使任务可用也是如此。
因此您需要注意适当的同步,这可能比您最初想象的要复杂一些。但是看到任务很长 运行 的事实,我会从一个常规的互斥体开始(可能是每个任务队列),例如synchronized 或锁,不用担心让它成为非阻塞。
所以这是我为解决我的问题而创建的 class 的 skeleton/tester。有关这里发生的事情的一些详细信息,请参阅@pveentjer 的回答。
package org.zur.test;
import java.io.File;
import java.util.Comparator;
import java.util.HashMap;
import java.util.PriorityQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PmFileProcessor {
private static final int THREADS_COUNT = 15;
ArrayBlockingQueue<File> scheduledFiles = new ArrayBlockingQueue<>(10000, false);
HashMap<String, PmFileProcessingJob> allJobs = new HashMap<>();
ScheduledExecutorService jobsExecutorService = Executors.newScheduledThreadPool(THREADS_COUNT);
public PmFileProcessor() {
super();
SourceJobsManager fc = new SourceJobsManager();
fc.setDaemon(true);
fc.start();
}
public void scheduleFile(File f) {
try {
scheduledFiles.add(f);
} catch (Exception e) {
// TODO: handle exception
}
}
/**
* Assigns files to file source processing job.
*
* @author
* <ul>
* <li>Zur13</li>
* </ul>
*
*/
public class SourceJobsManager extends Thread {
@Override
public void run() {
// assigns scheduled files to per-source jobs and schedules job for additional execution
while ( true ) {
try {
File f = scheduledFiles.take();
PmFileProcessingJob job = getSourceJob(f);
job.scheduleSourceFile(f);
jobsExecutorService.execute(job); // schedules job execution
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO: check disk space periodically
}
}
/**
* Finds existing job for file source or creates a new one.
*
* @param f
* @return
*/
private PmFileProcessingJob getSourceJob(File f) {
// TODO: test code
String fname = f.getName();
String[] parts = fname.split("_");
String uid = parts[0];
PmFileProcessingJob res = allJobs.get(uid);
if ( res == null ) {
res = new PmFileProcessingJob(uid);
allJobs.put(uid, res);
}
return res;
}
}
/**
* Process first file from scheduledSourceFiles queue (i.e. each job execution processes a single file or
* reschedules itself for later execution if another thread already processes the file from the same source).
*
* @author
* <ul>
* <li>Zur13</li>
* </ul>
*
*/
public class PmFileProcessingJob implements Runnable {
public final String fileSourceUidString;
PriorityQueue<File> scheduledSourceFiles = new PriorityQueue<>(1000, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
// TODO Auto-generated method stub
return 0;
}
});
Semaphore onePassSemaphore = new Semaphore(1);
public PmFileProcessingJob(String fileSourceUid) {
super();
this.fileSourceUidString = fileSourceUid;
}
/**
* Schedules file from for processing by this job.
*/
public void scheduleSourceFile(File f) {
scheduledSourceFiles.add(f);
}
@Override
public void run() {
File f = null;
if ( scheduledSourceFiles.size() > 0 ) { // fail fast optimization 1
if ( onePassSemaphore.tryAcquire() ) { // fail fast optimization 2
try {
f = scheduledSourceFiles.poll();
if ( f != null ) {
// TODO: process the file
try {
System.err.println(f.getName() + "\t" + Thread.currentThread().getId());
Thread.sleep(1000);
return;
} catch (Exception e) {
// TODO: handle exception
return; // prevents reschedule loop for failing files
}
} else {
// scheduledSourceFiles queue is empty
return;
}
} finally {
onePassSemaphore.release();
}
}
if ( f == null && scheduledSourceFiles.size() > 0 ) {
// this thread did not process the scheduled file because another thread holds the critical section
// pass
// this thread should reschedule this Job to release this thread and try to process this job later
// with another thread
// reschedule the job with 4 seconds delay to prevent excess CPU usage
// System.err.println("RESCHEDULE");
jobsExecutorService.schedule(this, 3, TimeUnit.SECONDS);
}
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((this.fileSourceUidString == null) ? 0 : this.fileSourceUidString.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if ( this == obj )
return true;
if ( !(obj instanceof PmFileProcessingJob) )
return false;
PmFileProcessingJob other = (PmFileProcessingJob) obj;
if ( this.fileSourceUidString == null ) {
if ( other.fileSourceUidString != null )
return false;
} else if ( !this.fileSourceUidString.equals(other.fileSourceUidString) )
return false;
return true;
}
}
public static void main(String[] args) {
PmFileProcessor fp = new PmFileProcessor();
fp.unitTest();
}
private void unitTest() {
// TODO Auto-generated method stub
int filesCount = 1000;
for (int i = 0; i < filesCount; i++) {
int sourceUid = ThreadLocalRandom.current().nextInt(1, 30);
File f = new File(sourceUid + "_" + i);
scheduleFile(f);
}
Thread.yield();
try {
Thread.sleep(999000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}