Hibernate (JPA) 中的并发实体访问
Concurrent entity access in Hibernate (JPA)
我正在尝试创建一个简单的序列 table 数据访问器。问题是,我不确定我的方法是否正确,以及——如果它是正确的方法——如何配置事务隔离。我们可以安全地假设,@Transactional 在 Spring 上下文中正确配置,因为事务在其他地方正常工作。
我想实现一个完全线程安全的 DAO 实现(或使用 DAO 的服务),它将为指定序列提供有保证的 next-in-like 值。而且,不幸的是,我不能使用内置的序列生成器,因为我需要实体外部的值,而且我不能使用 GUID 来生成 ID。
实体:
@Entity
@Table(name = "sys_sequence")
public class SequenceEntity
{
@Id
@Column(name = "ID_SEQ_NAME", length = 32)
private String name;
@Basic
@Column(name = "N_SEQ_VALUE")
private int value;
// constructors, getters & setters...
}
DAO 实现(请注意,当前的隔离和锁定模式设置只是一些其他测试值,我已经尝试过(但没有用,因为实体一直被锁定和查询超时):
public class SequenceDaoImpl
extends AbstractHibernateDao
implements SequenceDao
{
private static final Logger logger = Logger.getLogger(SequenceDaoImpl.class);
private static final Object lock = new Object();
/**
* Initializes sequence with default initial value zero (0).
* Next value will be +1, therefore one (1).
*
* @param sequenceName Name of the sequence
*/
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void initializeSequence(String sequenceName)
{
this.initializeSequence(sequenceName, 0);
}
/**
* Initializes sequence with given initial value.
* Next value will be +1, therefore initialValue + 1.
*
* @param sequenceName Name of the sequence
* @param initialValue Initial value of sequence
*/
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void initializeSequence(String sequenceName, int initialValue)
{
synchronized (lock)
{
Session session = this.getCurrentSession();
try
{
logger.debug("Creating new sequence '" + sequenceName + "' with initial value " + initialValue);
// create new sequence
SequenceEntity seq = new SequenceEntity(sequenceName, initialValue);
// save it to database
session.persist(seq);
session.flush();
}
catch (Exception ex)
{
throw new SequenceException("Unable to initialize sequence '" + sequenceName + "'.", ex);
}
}
}
/**
* Returns next value for given sequence, incrementing it automatically.
*
* @param sequenceName Name of the sequence to use
* @return Next value for this sequence
* @throws SequenceException
*/
@Override
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE, timeout = 5)
public int getNextValue(String sequenceName)
{
synchronized (lock)
{
Session session = this.getCurrentSession();
SequenceEntity seq = (SequenceEntity) session.createCriteria(SequenceEntity.class)
.add(Restrictions.eq("name", sequenceName))
.setLockMode(LockMode.PESSIMISTIC_WRITE)
.uniqueResult();
if (seq == null)
{
throw new SequenceException("Sequence '" + sequenceName + "' must be initialized first.");
}
seq.incValue();
session.update(seq);
session.flush();
// return the new value
return (seq.getValue());
}
}
}
AbstractHibernateDao有一些常用的方法,这里只用到一个:
public Session getCurrentSession()
{
return (this.getEntityManager().unwrap(Session.class));
}
我正在使用简单测试来测试代码:
public class SequenceDaoImplTest
extends AbstractDbTest
{
private static final int NUM_CONCURRENT_TASKS = 2;
protected class GetNextValueTask
implements Runnable
{
private int identifier;
private String sequenceName;
private List<Integer> nextValues = new LinkedList<>();
private int iterations;
private boolean error;
public GetNextValueTask(int identifier, String sequenceName, int iterations)
{
this.identifier = identifier;
this.sequenceName = sequenceName;
this.iterations = iterations;
}
@Override
public void run()
{
try
{
logger.debug("Starting test task #" + this.identifier + " with sequence: " + this.sequenceName);
for (int x = 0; x < this.iterations; x++)
{
logger.debug("Task #" + this.identifier + ": iteration #" + x + "; sequenceName=" + this.sequenceName);
nextValues.add(sequenceDao.getNextValue(this.sequenceName));
}
logger.debug("Completed test task #" + this.identifier);
logger.debug(this.toValuesString());
}
catch (Exception ex)
{
logger.error("Task #" + this.identifier, ex);
error = true;
}
}
public String toValuesString()
{
return (StringUtils.join(nextValues, ','));
}
public boolean isError()
{
return error;
}
}
@Autowired
private SequenceDao sequenceDao;
@Test
public void testGetNextValue()
throws Exception
{
sequenceDao.initializeSequence("SEQ_1");
for (int x = 1; x <= 10; x++)
{
Assert.assertEquals(x, sequenceDao.getNextValue("SEQ_1"));
}
}
@Test
public void testGetNextValueConcurrent()
throws Exception
{
sequenceDao.initializeSequence("SEQ_2");
ExecutorService executorService = Executors.newCachedThreadPool();
GetNextValueTask[] tasks = new GetNextValueTask[NUM_CONCURRENT_TASKS];
for (int x = 0; x < NUM_CONCURRENT_TASKS; x++)
{
tasks[x] = new GetNextValueTask(x, "SEQ_2", 100);
executorService.execute(tasks[x]);
}
executorService.awaitTermination(5, TimeUnit.SECONDS);
boolean isError = false;
for (int x = 0; x < NUM_CONCURRENT_TASKS; x++)
{
isError |= tasks[x].isError();
}
Assert.assertFalse("There was no error while running tasks.", isError);
}
}
第一个测试运行得很好,我只能假设,这是因为测试是 运行 在单线程上。第二个测试(并发),记录如下:
pool-1-thread-2 | DEBUG | Starting test task #1 with sequence: SEQ_2 (SequenceDaoImplTest.java:41)
pool-1-thread-1 | DEBUG | Starting test task #0 with sequence: SEQ_2 (SequenceDaoImplTest.java:41)
pool-1-thread-2 | DEBUG | Task #1: iteration #0; sequenceName=SEQ_2 (SequenceDaoImplTest.java:44)
pool-1-thread-1 | DEBUG | Task #0: iteration #0; sequenceName=SEQ_2 (SequenceDaoImplTest.java:44)
pool-1-thread-1 | WARN | SQL Error: -4872, SQLState: 40502 (SqlExceptionHelper.java:144)
pool-1-thread-1 | ERROR | statement execution aborted: timeout reached (SqlExceptionHelper.java:146)
pool-1-thread-1 | ERROR | Task #0 (SequenceDaoImplTest.java:52)
// 谢谢!
我终于让它工作了,发现了一些我没有完全意识到的事情:
当 initializeSequence(String)
在与 getNextValue(String)
不同的线程上调用时,代码失败。 因此,将初始化代码移动到 getNextValue(String)
解决了这个问题。 我无法在文档中找到对此的正确解释,因此我将其用作经验法则并将进一步调查。
只注解外部调用的方法,内部调用没有注解(其实这不是我代码的问题,但我不知道,它是相关的)
Spring Documentation: In proxy mode (which is the default), only external method calls coming in through the proxy are intercepted. This means that self-invocation, in effect, a method within the target object calling another method of the target object, will not lead to an actual transaction at runtime even if the invoked method is marked with @Transactional.
synchronized
块本来是第二道防线,现在移到 SequenceService
class,它有 @Transactional
注释,将外部访问。
int getNextValue(String, boolean)
的最终代码:
@Override
public int getNextValue(String sequenceName, boolean autoInit)
{
Session session = this.getCurrentSession();
SequenceEntity seq = (SequenceEntity) session.createCriteria(SequenceEntity.class)
.add(Restrictions.eq("name", sequenceName))
.setLockMode(LockMode.PESSIMISTIC_WRITE)
.uniqueResult();
if (seq == null)
{
if (!autoInit)
{
throw new SequenceException("Sequence '" + sequenceName + "' must be initialized first.");
}
seq = this.initializeSequence(sequenceName);
}
seq.incValue();
session.update(seq);
session.flush();
// return the new value
return (seq.getValue());
}
对于 SequenceService
方法 int getNextValue(String)
:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public int getNextValue(String sequenceName)
{
synchronized (lock)
{
return (this.sequenceDao.getNextValue(sequenceName));
}
}
synchronized
块不是必需的,但是当数据库服务器无法正确支持事务时,我将其作为第二道防线。性能损失与此方法无关。
我正在尝试创建一个简单的序列 table 数据访问器。问题是,我不确定我的方法是否正确,以及——如果它是正确的方法——如何配置事务隔离。我们可以安全地假设,@Transactional 在 Spring 上下文中正确配置,因为事务在其他地方正常工作。
我想实现一个完全线程安全的 DAO 实现(或使用 DAO 的服务),它将为指定序列提供有保证的 next-in-like 值。而且,不幸的是,我不能使用内置的序列生成器,因为我需要实体外部的值,而且我不能使用 GUID 来生成 ID。
实体:
@Entity
@Table(name = "sys_sequence")
public class SequenceEntity
{
@Id
@Column(name = "ID_SEQ_NAME", length = 32)
private String name;
@Basic
@Column(name = "N_SEQ_VALUE")
private int value;
// constructors, getters & setters...
}
DAO 实现(请注意,当前的隔离和锁定模式设置只是一些其他测试值,我已经尝试过(但没有用,因为实体一直被锁定和查询超时):
public class SequenceDaoImpl
extends AbstractHibernateDao
implements SequenceDao
{
private static final Logger logger = Logger.getLogger(SequenceDaoImpl.class);
private static final Object lock = new Object();
/**
* Initializes sequence with default initial value zero (0).
* Next value will be +1, therefore one (1).
*
* @param sequenceName Name of the sequence
*/
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void initializeSequence(String sequenceName)
{
this.initializeSequence(sequenceName, 0);
}
/**
* Initializes sequence with given initial value.
* Next value will be +1, therefore initialValue + 1.
*
* @param sequenceName Name of the sequence
* @param initialValue Initial value of sequence
*/
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void initializeSequence(String sequenceName, int initialValue)
{
synchronized (lock)
{
Session session = this.getCurrentSession();
try
{
logger.debug("Creating new sequence '" + sequenceName + "' with initial value " + initialValue);
// create new sequence
SequenceEntity seq = new SequenceEntity(sequenceName, initialValue);
// save it to database
session.persist(seq);
session.flush();
}
catch (Exception ex)
{
throw new SequenceException("Unable to initialize sequence '" + sequenceName + "'.", ex);
}
}
}
/**
* Returns next value for given sequence, incrementing it automatically.
*
* @param sequenceName Name of the sequence to use
* @return Next value for this sequence
* @throws SequenceException
*/
@Override
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE, timeout = 5)
public int getNextValue(String sequenceName)
{
synchronized (lock)
{
Session session = this.getCurrentSession();
SequenceEntity seq = (SequenceEntity) session.createCriteria(SequenceEntity.class)
.add(Restrictions.eq("name", sequenceName))
.setLockMode(LockMode.PESSIMISTIC_WRITE)
.uniqueResult();
if (seq == null)
{
throw new SequenceException("Sequence '" + sequenceName + "' must be initialized first.");
}
seq.incValue();
session.update(seq);
session.flush();
// return the new value
return (seq.getValue());
}
}
}
AbstractHibernateDao有一些常用的方法,这里只用到一个:
public Session getCurrentSession()
{
return (this.getEntityManager().unwrap(Session.class));
}
我正在使用简单测试来测试代码:
public class SequenceDaoImplTest
extends AbstractDbTest
{
private static final int NUM_CONCURRENT_TASKS = 2;
protected class GetNextValueTask
implements Runnable
{
private int identifier;
private String sequenceName;
private List<Integer> nextValues = new LinkedList<>();
private int iterations;
private boolean error;
public GetNextValueTask(int identifier, String sequenceName, int iterations)
{
this.identifier = identifier;
this.sequenceName = sequenceName;
this.iterations = iterations;
}
@Override
public void run()
{
try
{
logger.debug("Starting test task #" + this.identifier + " with sequence: " + this.sequenceName);
for (int x = 0; x < this.iterations; x++)
{
logger.debug("Task #" + this.identifier + ": iteration #" + x + "; sequenceName=" + this.sequenceName);
nextValues.add(sequenceDao.getNextValue(this.sequenceName));
}
logger.debug("Completed test task #" + this.identifier);
logger.debug(this.toValuesString());
}
catch (Exception ex)
{
logger.error("Task #" + this.identifier, ex);
error = true;
}
}
public String toValuesString()
{
return (StringUtils.join(nextValues, ','));
}
public boolean isError()
{
return error;
}
}
@Autowired
private SequenceDao sequenceDao;
@Test
public void testGetNextValue()
throws Exception
{
sequenceDao.initializeSequence("SEQ_1");
for (int x = 1; x <= 10; x++)
{
Assert.assertEquals(x, sequenceDao.getNextValue("SEQ_1"));
}
}
@Test
public void testGetNextValueConcurrent()
throws Exception
{
sequenceDao.initializeSequence("SEQ_2");
ExecutorService executorService = Executors.newCachedThreadPool();
GetNextValueTask[] tasks = new GetNextValueTask[NUM_CONCURRENT_TASKS];
for (int x = 0; x < NUM_CONCURRENT_TASKS; x++)
{
tasks[x] = new GetNextValueTask(x, "SEQ_2", 100);
executorService.execute(tasks[x]);
}
executorService.awaitTermination(5, TimeUnit.SECONDS);
boolean isError = false;
for (int x = 0; x < NUM_CONCURRENT_TASKS; x++)
{
isError |= tasks[x].isError();
}
Assert.assertFalse("There was no error while running tasks.", isError);
}
}
第一个测试运行得很好,我只能假设,这是因为测试是 运行 在单线程上。第二个测试(并发),记录如下:
pool-1-thread-2 | DEBUG | Starting test task #1 with sequence: SEQ_2 (SequenceDaoImplTest.java:41)
pool-1-thread-1 | DEBUG | Starting test task #0 with sequence: SEQ_2 (SequenceDaoImplTest.java:41)
pool-1-thread-2 | DEBUG | Task #1: iteration #0; sequenceName=SEQ_2 (SequenceDaoImplTest.java:44)
pool-1-thread-1 | DEBUG | Task #0: iteration #0; sequenceName=SEQ_2 (SequenceDaoImplTest.java:44)
pool-1-thread-1 | WARN | SQL Error: -4872, SQLState: 40502 (SqlExceptionHelper.java:144)
pool-1-thread-1 | ERROR | statement execution aborted: timeout reached (SqlExceptionHelper.java:146)
pool-1-thread-1 | ERROR | Task #0 (SequenceDaoImplTest.java:52)
// 谢谢!
我终于让它工作了,发现了一些我没有完全意识到的事情:
当
initializeSequence(String)
在与getNextValue(String)
不同的线程上调用时,代码失败。 因此,将初始化代码移动到getNextValue(String)
解决了这个问题。 我无法在文档中找到对此的正确解释,因此我将其用作经验法则并将进一步调查。只注解外部调用的方法,内部调用没有注解(其实这不是我代码的问题,但我不知道,它是相关的)
Spring Documentation: In proxy mode (which is the default), only external method calls coming in through the proxy are intercepted. This means that self-invocation, in effect, a method within the target object calling another method of the target object, will not lead to an actual transaction at runtime even if the invoked method is marked with @Transactional.
synchronized
块本来是第二道防线,现在移到SequenceService
class,它有@Transactional
注释,将外部访问。
int getNextValue(String, boolean)
的最终代码:
@Override
public int getNextValue(String sequenceName, boolean autoInit)
{
Session session = this.getCurrentSession();
SequenceEntity seq = (SequenceEntity) session.createCriteria(SequenceEntity.class)
.add(Restrictions.eq("name", sequenceName))
.setLockMode(LockMode.PESSIMISTIC_WRITE)
.uniqueResult();
if (seq == null)
{
if (!autoInit)
{
throw new SequenceException("Sequence '" + sequenceName + "' must be initialized first.");
}
seq = this.initializeSequence(sequenceName);
}
seq.incValue();
session.update(seq);
session.flush();
// return the new value
return (seq.getValue());
}
对于 SequenceService
方法 int getNextValue(String)
:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public int getNextValue(String sequenceName)
{
synchronized (lock)
{
return (this.sequenceDao.getNextValue(sequenceName));
}
}
synchronized
块不是必需的,但是当数据库服务器无法正确支持事务时,我将其作为第二道防线。性能损失与此方法无关。