使用阻塞队列的对象池在死锁中运行
Object pool using blocking queue runs in deadlock
我在 payara 5 中有一个 java 应用程序 运行ning。
我需要汇集我的 bean 将使用的一些引擎对象(来自库)。
创建引擎需要在单独的线程中完成。
因此我想出了我的 EnginePool 和我的 EngineProducer。
这个想法是 EnginePool 管理两个 BlockingQueues。一个用于可用引擎,一个用于被 bean 使用并需要再次可用的引擎。
EnginePool 应该只可用一次,所以它是一个单例。
@Singleton
@Startup
@LocalBean
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class AbbyyEnginePool implements OcrEnginePool<IEngine> {
private static final Logger logger = LoggerFactory.getLogger(AbbyyEnginePool.class);
@Resource(lookup = "java:comp/DefaultManagedThreadFactory")
private ManagedThreadFactory threadFactory;
private static final int DEFAULT_ENGINE_COUNT = 3;
private BlockingQueue<EngineMetaInfo> availableEngines = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private BlockingQueue<IEngine> enginesToRelease = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private Map<IEngine, IEngine> proxiesMapping = new ConcurrentHashMap<>(DEFAULT_ENGINE_COUNT);
private int poolSize;
public AbbyyEnginePool() {
this(DEFAULT_ENGINE_COUNT);
}
public AbbyyEnginePool(int poolSize) {
this.poolSize = poolSize;
availableEngines = new ArrayBlockingQueue<>(poolSize);
enginesToRelease = new ArrayBlockingQueue<>(poolSize);
proxiesMapping = new ConcurrentHashMap<>(poolSize);
}
void setThreadFactory(ManagedThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@PostConstruct
void init() {
EngineProducer engineProducer = new EngineProducer(availableEngines, enginesToRelease, poolSize);
Thread engineProducerThread = threadFactory.newThread(engineProducer);
engineProducerThread.setName("engineProducer");
engineProducerThread.start();
}
@Override
public IEngine get() throws EngineException {
try {
EngineMetaInfo engineMetaInfo = availableEngines.take();
IEngine engineProxy = IEngine.UnmarshalInterface(engineMetaInfo.engineHandle);
proxiesMapping.put(engineProxy, engineMetaInfo.engine);
return engineProxy;
} catch (InterruptedException e) {
throw new EngineException("Could not retrieve engine", e);
}
}
@Override
public void release(IEngine engineProxy) throws EngineException {
if (engineProxy != null) {
synchronized (proxiesMapping) {
if (proxiesMapping.containsKey(engineProxy)) {
try {
IEngine engine = proxiesMapping.remove(engineProxy);
enginesToRelease.put(engine);
} catch (Exception e) {
throw new EngineException("Could not release engine proxy.");
}
} else {
logger.warn("Engine proxy was not registered. Could not release proxy.");
}
}
}
}
static class EngineMetaInfo {
long engineHandle;
IEngine engine;
EngineMetaInfo(long engineHandle, IEngine engine) {
this.engineHandle = engineHandle;
this.engine = engine;
}
}
}
EngineProducer 看起来像这样:
public class EngineProducer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(EngineProducer.class);
private static final String PROJECT_ID = "someId";
private final Integer MAX_ENGINE_COUNT;
private final BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines;
private final BlockingQueue<IEngine> enginesToRelease;
private Boolean isRunning = Boolean.FALSE;
private List<EngineHolder> enginesHolder;
public EngineProducer(BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines,
BlockingQueue<IEngine> enginesToRelease,
Integer maxEnginesCount) {
this.availableEngines = availableEngines;
this.enginesToRelease = enginesToRelease;
this.MAX_ENGINE_COUNT = maxEnginesCount;
this.enginesHolder = new ArrayList<>(MAX_ENGINE_COUNT);
}
private void initEngines() {
synchronized (availableEngines) {
if (availableEngines.size() == 0) {
try {
for (int i = 0; i < MAX_ENGINE_COUNT; i++) {
EngineHolder engineHolder = new EngineHolder(PROJECT_ID);
enginesHolder.add(engineHolder);
IEngine engine = engineHolder.getAndLockEngine();
long engineHandle = engine.MarshalInterface();
AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
availableEngines.put(engineMetaInfo);
}
logger.info("{} abbyy engines prepared for processing", MAX_ENGINE_COUNT);
EnginePool.setInitialized(Boolean.TRUE);
isRunning = Boolean.TRUE;
} catch (Exception e) {
logger.error("Could not instantiate engines.", e);
}
}
}
}
@Override
public void run() {
try {
initEngines();
while(isRunning) {
IEngine engineProxyToRelease = enginesToRelease.take();
releaseEngine(engineProxyToRelease);
}
availableEngines.clear();
for(int i = 0; i < enginesHolder.size(); i++) {
enginesHolder.get(i).unloadEngine();
}
} catch (Exception e) {
logger.error("EngineProducer encounter a problem.", e);
}
}
public void unloadEngines() {
isRunning = Boolean.FALSE;
}
private void releaseEngine( IEngine engineToRelease ) {
for (EngineHolder engineHolder : enginesHolder) {
if (engineHolder.containsEngine(engineToRelease)) {
engineHolder.unlockEngine();
IEngine engine = engineHolder.getAndLockEngine();
long engineHandle = engine.MarshalInterface();
AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
try {
availableEngines.put(engineMetaInfo);
} catch (InterruptedException e) {
logger.warn("could not add free engine");
}
break;
}
}
}
}
当我 运行 在测试中而不是在 glassfish 中时,它 运行 没有问题。
但是当我 运行 它在 glassfish 中时,豆子会 运行 陷入僵局。
bean 使用此代码获取和释放引擎:
engine = enginePool.get();
ProcessingResult processingResult = null;
try {
this.parameters = parameters;
this.tmpDir = tmpDir;
Path customProfileFile = loadProfiles(parameters);
Instant processingStart = Instant.now();
processingResult = processFile();
Instant processingEnd = Instant.now();
enginePool.release(engine);
engine = null;
processingResult.setProcessingStartTime(processingStart);
processingResult.setProcessingEndTime(processingEnd);
logger.info("Processing took about {} milliseconds.", processingResult.getProcessDurationInMilliseconds());
customProfileFile.toFile().delete();
this.tmpDir.toFile().delete();
} catch (Exception e) {
logger.error("Ocr of document failed ",e );
enginePool.release(engine);
throw new EngineException("Ocr of document failed.", e);
}
在我的场景中,有 4 个 bean 试图获取引擎。其中 3 个将获得一个,最后一个 bean 将等待 engine = enginePool.get();
获得引擎的 3 个 bean 将完成它们的工作并等待 enginePool.release(engine);
。我进行了线程转储,可以看到 3 个 bean 正在等待没有引擎的 bean 持有的锁。所以他们不能释放引擎。
我的问题是我不明白。释放和获取引擎在不同的阻塞队列上工作,所以我想知道为什么最后一个等待获取引擎的 bean 会阻塞其他试图释放引擎的 bean。
问题是容器管理所有并发。在单例的情况下,这意味着对字段的所有访问都将获得写锁。
解决方案是使用@ConcurrencyManagement(BEAN) 注释。这意味着 bean 控制并发管理并且必须确保完成同步。
可以找到详细的解释here。
我在 payara 5 中有一个 java 应用程序 运行ning。
我需要汇集我的 bean 将使用的一些引擎对象(来自库)。 创建引擎需要在单独的线程中完成。
因此我想出了我的 EnginePool 和我的 EngineProducer。 这个想法是 EnginePool 管理两个 BlockingQueues。一个用于可用引擎,一个用于被 bean 使用并需要再次可用的引擎。 EnginePool 应该只可用一次,所以它是一个单例。
@Singleton
@Startup
@LocalBean
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class AbbyyEnginePool implements OcrEnginePool<IEngine> {
private static final Logger logger = LoggerFactory.getLogger(AbbyyEnginePool.class);
@Resource(lookup = "java:comp/DefaultManagedThreadFactory")
private ManagedThreadFactory threadFactory;
private static final int DEFAULT_ENGINE_COUNT = 3;
private BlockingQueue<EngineMetaInfo> availableEngines = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private BlockingQueue<IEngine> enginesToRelease = new ArrayBlockingQueue<>(DEFAULT_ENGINE_COUNT);
private Map<IEngine, IEngine> proxiesMapping = new ConcurrentHashMap<>(DEFAULT_ENGINE_COUNT);
private int poolSize;
public AbbyyEnginePool() {
this(DEFAULT_ENGINE_COUNT);
}
public AbbyyEnginePool(int poolSize) {
this.poolSize = poolSize;
availableEngines = new ArrayBlockingQueue<>(poolSize);
enginesToRelease = new ArrayBlockingQueue<>(poolSize);
proxiesMapping = new ConcurrentHashMap<>(poolSize);
}
void setThreadFactory(ManagedThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@PostConstruct
void init() {
EngineProducer engineProducer = new EngineProducer(availableEngines, enginesToRelease, poolSize);
Thread engineProducerThread = threadFactory.newThread(engineProducer);
engineProducerThread.setName("engineProducer");
engineProducerThread.start();
}
@Override
public IEngine get() throws EngineException {
try {
EngineMetaInfo engineMetaInfo = availableEngines.take();
IEngine engineProxy = IEngine.UnmarshalInterface(engineMetaInfo.engineHandle);
proxiesMapping.put(engineProxy, engineMetaInfo.engine);
return engineProxy;
} catch (InterruptedException e) {
throw new EngineException("Could not retrieve engine", e);
}
}
@Override
public void release(IEngine engineProxy) throws EngineException {
if (engineProxy != null) {
synchronized (proxiesMapping) {
if (proxiesMapping.containsKey(engineProxy)) {
try {
IEngine engine = proxiesMapping.remove(engineProxy);
enginesToRelease.put(engine);
} catch (Exception e) {
throw new EngineException("Could not release engine proxy.");
}
} else {
logger.warn("Engine proxy was not registered. Could not release proxy.");
}
}
}
}
static class EngineMetaInfo {
long engineHandle;
IEngine engine;
EngineMetaInfo(long engineHandle, IEngine engine) {
this.engineHandle = engineHandle;
this.engine = engine;
}
}
}
EngineProducer 看起来像这样:
public class EngineProducer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(EngineProducer.class);
private static final String PROJECT_ID = "someId";
private final Integer MAX_ENGINE_COUNT;
private final BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines;
private final BlockingQueue<IEngine> enginesToRelease;
private Boolean isRunning = Boolean.FALSE;
private List<EngineHolder> enginesHolder;
public EngineProducer(BlockingQueue<AbbyyEnginePool.EngineMetaInfo> availableEngines,
BlockingQueue<IEngine> enginesToRelease,
Integer maxEnginesCount) {
this.availableEngines = availableEngines;
this.enginesToRelease = enginesToRelease;
this.MAX_ENGINE_COUNT = maxEnginesCount;
this.enginesHolder = new ArrayList<>(MAX_ENGINE_COUNT);
}
private void initEngines() {
synchronized (availableEngines) {
if (availableEngines.size() == 0) {
try {
for (int i = 0; i < MAX_ENGINE_COUNT; i++) {
EngineHolder engineHolder = new EngineHolder(PROJECT_ID);
enginesHolder.add(engineHolder);
IEngine engine = engineHolder.getAndLockEngine();
long engineHandle = engine.MarshalInterface();
AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
availableEngines.put(engineMetaInfo);
}
logger.info("{} abbyy engines prepared for processing", MAX_ENGINE_COUNT);
EnginePool.setInitialized(Boolean.TRUE);
isRunning = Boolean.TRUE;
} catch (Exception e) {
logger.error("Could not instantiate engines.", e);
}
}
}
}
@Override
public void run() {
try {
initEngines();
while(isRunning) {
IEngine engineProxyToRelease = enginesToRelease.take();
releaseEngine(engineProxyToRelease);
}
availableEngines.clear();
for(int i = 0; i < enginesHolder.size(); i++) {
enginesHolder.get(i).unloadEngine();
}
} catch (Exception e) {
logger.error("EngineProducer encounter a problem.", e);
}
}
public void unloadEngines() {
isRunning = Boolean.FALSE;
}
private void releaseEngine( IEngine engineToRelease ) {
for (EngineHolder engineHolder : enginesHolder) {
if (engineHolder.containsEngine(engineToRelease)) {
engineHolder.unlockEngine();
IEngine engine = engineHolder.getAndLockEngine();
long engineHandle = engine.MarshalInterface();
AbbyyEnginePool.EngineMetaInfo engineMetaInfo = new AbbyyEnginePool.EngineMetaInfo(engineHandle, engine);
try {
availableEngines.put(engineMetaInfo);
} catch (InterruptedException e) {
logger.warn("could not add free engine");
}
break;
}
}
}
}
当我 运行 在测试中而不是在 glassfish 中时,它 运行 没有问题。 但是当我 运行 它在 glassfish 中时,豆子会 运行 陷入僵局。
bean 使用此代码获取和释放引擎:
engine = enginePool.get();
ProcessingResult processingResult = null;
try {
this.parameters = parameters;
this.tmpDir = tmpDir;
Path customProfileFile = loadProfiles(parameters);
Instant processingStart = Instant.now();
processingResult = processFile();
Instant processingEnd = Instant.now();
enginePool.release(engine);
engine = null;
processingResult.setProcessingStartTime(processingStart);
processingResult.setProcessingEndTime(processingEnd);
logger.info("Processing took about {} milliseconds.", processingResult.getProcessDurationInMilliseconds());
customProfileFile.toFile().delete();
this.tmpDir.toFile().delete();
} catch (Exception e) {
logger.error("Ocr of document failed ",e );
enginePool.release(engine);
throw new EngineException("Ocr of document failed.", e);
}
在我的场景中,有 4 个 bean 试图获取引擎。其中 3 个将获得一个,最后一个 bean 将等待 engine = enginePool.get();
获得引擎的 3 个 bean 将完成它们的工作并等待 enginePool.release(engine);
。我进行了线程转储,可以看到 3 个 bean 正在等待没有引擎的 bean 持有的锁。所以他们不能释放引擎。
我的问题是我不明白。释放和获取引擎在不同的阻塞队列上工作,所以我想知道为什么最后一个等待获取引擎的 bean 会阻塞其他试图释放引擎的 bean。
问题是容器管理所有并发。在单例的情况下,这意味着对字段的所有访问都将获得写锁。
解决方案是使用@ConcurrencyManagement(BEAN) 注释。这意味着 bean 控制并发管理并且必须确保完成同步。
可以找到详细的解释here。