'data race' 可以在带有 InnoDB 的 MariaDB 中使用触发器吗?

Can 'data race' happen with triggers in MariaDB with InnoDB?

我的应用程序处理大量实时数据(每天超过 2 亿),我需要实时汇总它们以保持报告性能。数据由服务器的多个线程随机馈送和处理。

我使用 MariaDB 10.5.6-MariaDBInnoDB 10.5.6

分享您的一些相关书签也将不胜感激,因为我没有在 google.

中找到任何简洁有用的内容

更新

我添加了一个插入后触发器,如果​​该记录不存在,它会在报告 table 中创建一条新记录,然后使用更新语句 update table set field=value+delta where condition 更新列。 数据库不喜欢它,正在发送数据的应用程序 - java,hibernate - 也无法忍受并开始抛出异常:

我发现触发器不是线程安全的,因为数据库开始为同一行的并发更新抛出不同的错误:

  • 行已被另一个事务更新或删除(或未保存的值映射不正确)
  • 尝试获取锁时发现死锁;尝试重新启动交易

我尝试引入行级锁定,但它根本不起作用。我相信锁被忽略了或者行根本没有被锁定

$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service 
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
  • 自动提交已禁用
  • 事务隔离更改为读提交
  • 尝试使用主键SELECT what FROM tables WHERE conditions FOR UPDATE进行行级锁定

也尝试了 table 级锁定等效解决方案以及使用单个线程持久化数据,但它无法处理我拥有的数据量。

我寻求的解决方案是将提要处理与持久性进行线程级隔离,方法是多个线程处理传入的数据提要并为另一组线程创建实体对象以将它们保存在数据库中。这使我能够进行实验并为我的平台找到每个区域的最佳线程数,例如目前,我正在测试 8 个线程处理传入的提要并为另外 4 个线程创建实体对象,这些线程负责将它们保存在数据库中。对于持久线程,我在应用层引入了一些智能隔离和自定义锁定实体集,以确保没有两个线程试图同时更新同一行。这似乎可行,我现在只需要为这两个区域找到合适的线程数。

这是消费者class,它为数据库编写者

生成积压
    protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);

    private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
            for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
                persister.saveAll(dao, entry.getKey(), entry.getValue());
            }
    }

这是数据库编写者的积压工作

    private LinkedBlockingQueue<Key> keys;
    private Map<Key, Set> backlog;

    public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
        Key<ENTITY> key = Key.get(dao, bucket);
        synchronized (key) {
            synchronized (backlog) {
                if (backlog.containsKey(key)) {
                    backlog.get(key).addAll(entitySet);
                } else {
                    backlog.put(key, entitySet);
                    try {
                        keys.put(key);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    }

这是DB writer的核心

    private void processDBBatchUpdate(Key key) {
        synchronized (key) {
            Set set;
            synchronized (backlog) {
                set = backlog.remove(key);
            }

            key.getDao().saveAll(set);
        }
    }

这是锁定class的钥匙

    private IDefaultEntityDAO<ENTITY> dao;
    private String bucket;
    private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();

    private Key(IDefaultEntityDAO dao, String bucket) {
        this.dao = dao;
        this.bucket = bucket;
    }

    public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
        if (!keys.containsKey(dao)) {
            keys.put(dao, new HashMap<>());
        }

        if (!keys.get(dao).containsKey(bucket)) {
            keys.get(dao).put(bucket, new Key(dao, bucket));
        }

        return keys.get(dao).get(bucket);
    }