Может ли "гонка данных" произойти с триггерами в MariaDB с InnoDB?

Мое приложение обрабатывает очень большой объем данных в реальном времени (>200 миллионов в день), и мне нужно агрегировать их в режиме реального времени, чтобы отчеты оставались эффективными. Данные загружаются и, следовательно, обрабатываются сервером случайным образом несколькими потоками.

я использую MariaDB 10.5.6-MariaDB с участием InnoDB 10.5.6

  • Знаете ли вы, является ли триггер потокобезопасным, т.е. может ли вообще произойти гонка за данными? Другими словами, когда 1000 обновлений - только приращение - происходит с одними и теми же столбцами в одной строке в течение секунды на 10 подключений, тогда данные не будут перепутаны, и результат будет таким, как если бы значения были суммированы одним подключением последовательно..

  • Знаете ли вы, как работает блокировка на уровне строк, и если она автоматическая или может быть применена вручную?

Было бы также полезно поделиться некоторыми из ваших соответствующих закладок, потому что я не нашел ничего лаконичного и полезного в Google.

ОБНОВИТЬ

Я добавил триггер после вставки, который создал новую запись в таблице отчетов, если запись не существует, а затем обновил столбцы с помощью оператора обновления. update table set field=value+delta where condition. Базе данных это не понравилось, и приложение - java, hibernate -, которое отправляло данные, тоже не выдержало и начало генерировать исключения:

  • Это совершенно не имеет отношения к строке, которую спящий режим пытался вставить, потому что он не пытался обновить. Очевидно, это исходит от триггера MariaDB: Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
  • Я не уверен, почему это произошло, но я тоже кое-что получил: Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction

1 ответ

Решение

Я обнаружил, что триггер не был потокобезопасным в том смысле, что база данных начала выдавать разные ошибки для одновременного обновления одной и той же строки:

  • Строка была обновлена ​​или удалена другой транзакцией (или сопоставление несохраненных значений было неправильным)
  • Обнаружен тупик при попытке получить блокировку; попробуйте перезапустить транзакцию

Я попытался ввести блокировку на уровне строк, но это вообще не сработало. Я считаю, что блокировка игнорировалась или строки вообще не блокировались

$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service 
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
  • Автофиксация отключена
  • Изоляция транзакции была изменена на фиксацию чтения
  • Пробовала блокировку на уровне строк с помощью SELECT what FROM tables WHERE conditions FOR UPDATE с использованием первичных ключей

Пробовал эквивалентное решение с блокировкой на уровне таблицы с сохранением данных с помощью одного потока, но оно не могло справиться с объемом данных, который у меня был.

Решение, к которому я пришел, - это разделение на уровне потоков обработки фида от персистентности таким образом, что несколько потоков обрабатывают входящий фид данных и создают объекты сущностей для другого набора потоков, чтобы сохранить их в базе данных. Это позволяет мне экспериментировать и находить оптимальное количество потоков в каждой области для моей платформы, как и в настоящее время, я тестирую с 8 потоками, обрабатывающими входящий поток и создающими объекты сущностей для еще 4 потоков, которые отвечают за их сохранение в базе данных. Для постоянных потоков я ввел некоторую интеллектуальную сегрегацию и настраиваемую блокировку набора сущностей на уровне приложения, чтобы гарантировать, что никакие два потока не будут пытаться обновить одну и ту же строку одновременно. Кажется, это работает, теперь мне просто нужно найти правильное количество потоков для обеих областей.

Это потребительский класс, который создает отставание для писателей БД.

    protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);

    private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
            for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
                persister.saveAll(dao, entry.getKey(), entry.getValue());
            }
    }

Это отставание для писателей БД

    private LinkedBlockingQueue<Key> keys;
    private Map<Key, Set> backlog;

    public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
        Key<ENTITY> key = Key.get(dao, bucket);
        synchronized (key) {
            synchronized (backlog) {
                if (backlog.containsKey(key)) {
                    backlog.get(key).addAll(entitySet);
                } else {
                    backlog.put(key, entitySet);
                    try {
                        keys.put(key);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    }

Это ядро ​​писателя БД

    private void processDBBatchUpdate(Key key) {
        synchronized (key) {
            Set set;
            synchronized (backlog) {
                set = backlog.remove(key);
            }

            key.getDao().saveAll(set);
        }
    }

Это ключевой класс для блокировки

    private IDefaultEntityDAO<ENTITY> dao;
    private String bucket;
    private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();

    private Key(IDefaultEntityDAO dao, String bucket) {
        this.dao = dao;
        this.bucket = bucket;
    }

    public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
        if (!keys.containsKey(dao)) {
            keys.put(dao, new HashMap<>());
        }

        if (!keys.get(dao).containsKey(bucket)) {
            keys.get(dao).put(bucket, new Key(dao, bucket));
        }

        return keys.get(dao).get(bucket);
    }
Другие вопросы по тегам