'data race' 可以在带有 InnoDB 的 MariaDB 中使用触发器吗?
Can 'data race' happen with triggers in MariaDB with InnoDB?
我的应用程序处理大量实时数据(每天超过 2 亿),我需要实时汇总它们以保持报告性能。数据由服务器的多个线程随机馈送和处理。
我使用 MariaDB 10.5.6-MariaDB
和 InnoDB 10.5.6
您知道触发器是否是线程安全的吗,即数据竞争是否会发生。换句话说,当 1000 次更新 - 仅递增 - 在一秒内通过 10 个连接发生在单行中的相同列时,数据将不会被弄乱,结果将就像值由单个连接串联求和.
您知道行级锁定是如何工作的吗?它是自动的还是可以手动强制执行的。
分享您的一些相关书签也将不胜感激,因为我没有在 google.
中找到任何简洁有用的内容
更新
我添加了一个插入后触发器,如果该记录不存在,它会在报告 table 中创建一条新记录,然后使用更新语句 update table set field=value+delta where condition
更新列。
数据库不喜欢它,正在发送数据的应用程序 - java,hibernate - 也无法忍受并开始抛出异常:
- 这与 hibernate 试图插入的行完全无关,因为它没有尝试更新。显然它来自 MariaDB 触发器:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
- 我不确定为什么会发生这种情况,但也得到了一些:
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
我发现触发器不是线程安全的,因为数据库开始为同一行的并发更新抛出不同的错误:
- 行已被另一个事务更新或删除(或未保存的值映射不正确)
- 尝试获取锁时发现死锁;尝试重新启动交易
我尝试引入行级锁定,但它根本不起作用。我相信锁被忽略了或者行根本没有被锁定
$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
- 自动提交已禁用
- 事务隔离更改为读提交
- 尝试使用主键
SELECT what FROM tables WHERE conditions FOR UPDATE
进行行级锁定
也尝试了 table 级锁定等效解决方案以及使用单个线程持久化数据,但它无法处理我拥有的数据量。
我寻求的解决方案是将提要处理与持久性进行线程级隔离,方法是多个线程处理传入的数据提要并为另一组线程创建实体对象以将它们保存在数据库中。这使我能够进行实验并为我的平台找到每个区域的最佳线程数,例如目前,我正在测试 8 个线程处理传入的提要并为另外 4 个线程创建实体对象,这些线程负责将它们保存在数据库中。对于持久线程,我在应用层引入了一些智能隔离和自定义锁定实体集,以确保没有两个线程试图同时更新同一行。这似乎可行,我现在只需要为这两个区域找到合适的线程数。
这是消费者class,它为数据库编写者
生成积压
protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);
private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
persister.saveAll(dao, entry.getKey(), entry.getValue());
}
}
这是数据库编写者的积压工作
private LinkedBlockingQueue<Key> keys;
private Map<Key, Set> backlog;
public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
Key<ENTITY> key = Key.get(dao, bucket);
synchronized (key) {
synchronized (backlog) {
if (backlog.containsKey(key)) {
backlog.get(key).addAll(entitySet);
} else {
backlog.put(key, entitySet);
try {
keys.put(key);
} catch (InterruptedException ex) {
}
}
}
}
}
这是DB writer的核心
private void processDBBatchUpdate(Key key) {
synchronized (key) {
Set set;
synchronized (backlog) {
set = backlog.remove(key);
}
key.getDao().saveAll(set);
}
}
这是锁定class的钥匙
private IDefaultEntityDAO<ENTITY> dao;
private String bucket;
private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();
private Key(IDefaultEntityDAO dao, String bucket) {
this.dao = dao;
this.bucket = bucket;
}
public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
if (!keys.containsKey(dao)) {
keys.put(dao, new HashMap<>());
}
if (!keys.get(dao).containsKey(bucket)) {
keys.get(dao).put(bucket, new Key(dao, bucket));
}
return keys.get(dao).get(bucket);
}
我的应用程序处理大量实时数据(每天超过 2 亿),我需要实时汇总它们以保持报告性能。数据由服务器的多个线程随机馈送和处理。
我使用 MariaDB 10.5.6-MariaDB
和 InnoDB 10.5.6
您知道触发器是否是线程安全的吗,即数据竞争是否会发生。换句话说,当 1000 次更新 - 仅递增 - 在一秒内通过 10 个连接发生在单行中的相同列时,数据将不会被弄乱,结果将就像值由单个连接串联求和.
您知道行级锁定是如何工作的吗?它是自动的还是可以手动强制执行的。
分享您的一些相关书签也将不胜感激,因为我没有在 google.
中找到任何简洁有用的内容更新
我添加了一个插入后触发器,如果该记录不存在,它会在报告 table 中创建一条新记录,然后使用更新语句 update table set field=value+delta where condition
更新列。
数据库不喜欢它,正在发送数据的应用程序 - java,hibernate - 也无法忍受并开始抛出异常:
- 这与 hibernate 试图插入的行完全无关,因为它没有尝试更新。显然它来自 MariaDB 触发器:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
- 我不确定为什么会发生这种情况,但也得到了一些:
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
我发现触发器不是线程安全的,因为数据库开始为同一行的并发更新抛出不同的错误:
- 行已被另一个事务更新或删除(或未保存的值映射不正确)
- 尝试获取锁时发现死锁;尝试重新启动交易
我尝试引入行级锁定,但它根本不起作用。我相信锁被忽略了或者行根本没有被锁定
$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
- 自动提交已禁用
- 事务隔离更改为读提交
- 尝试使用主键
SELECT what FROM tables WHERE conditions FOR UPDATE
进行行级锁定
也尝试了 table 级锁定等效解决方案以及使用单个线程持久化数据,但它无法处理我拥有的数据量。
我寻求的解决方案是将提要处理与持久性进行线程级隔离,方法是多个线程处理传入的数据提要并为另一组线程创建实体对象以将它们保存在数据库中。这使我能够进行实验并为我的平台找到每个区域的最佳线程数,例如目前,我正在测试 8 个线程处理传入的提要并为另外 4 个线程创建实体对象,这些线程负责将它们保存在数据库中。对于持久线程,我在应用层引入了一些智能隔离和自定义锁定实体集,以确保没有两个线程试图同时更新同一行。这似乎可行,我现在只需要为这两个区域找到合适的线程数。
这是消费者class,它为数据库编写者
生成积压 protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);
private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
persister.saveAll(dao, entry.getKey(), entry.getValue());
}
}
这是数据库编写者的积压工作
private LinkedBlockingQueue<Key> keys;
private Map<Key, Set> backlog;
public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
Key<ENTITY> key = Key.get(dao, bucket);
synchronized (key) {
synchronized (backlog) {
if (backlog.containsKey(key)) {
backlog.get(key).addAll(entitySet);
} else {
backlog.put(key, entitySet);
try {
keys.put(key);
} catch (InterruptedException ex) {
}
}
}
}
}
这是DB writer的核心
private void processDBBatchUpdate(Key key) {
synchronized (key) {
Set set;
synchronized (backlog) {
set = backlog.remove(key);
}
key.getDao().saveAll(set);
}
}
这是锁定class的钥匙
private IDefaultEntityDAO<ENTITY> dao;
private String bucket;
private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();
private Key(IDefaultEntityDAO dao, String bucket) {
this.dao = dao;
this.bucket = bucket;
}
public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
if (!keys.containsKey(dao)) {
keys.put(dao, new HashMap<>());
}
if (!keys.get(dao).containsKey(bucket)) {
keys.get(dao).put(bucket, new Key(dao, bucket));
}
return keys.get(dao).get(bucket);
}