Может ли "гонка данных" произойти с триггерами в MariaDB с InnoDB?
Мое приложение обрабатывает очень большой объем данных в реальном времени (>200 миллионов в день), и мне нужно агрегировать их в режиме реального времени, чтобы отчеты оставались эффективными. Данные загружаются и, следовательно, обрабатываются сервером случайным образом несколькими потоками.
я использую
MariaDB 10.5.6-MariaDB
с участием
InnoDB 10.5.6
Знаете ли вы, является ли триггер потокобезопасным, т.е. может ли вообще произойти гонка за данными? Другими словами, когда 1000 обновлений - только приращение - происходит с одними и теми же столбцами в одной строке в течение секунды на 10 подключений, тогда данные не будут перепутаны, и результат будет таким, как если бы значения были суммированы одним подключением последовательно..
Знаете ли вы, как работает блокировка на уровне строк, и если она автоматическая или может быть применена вручную?
Было бы также полезно поделиться некоторыми из ваших соответствующих закладок, потому что я не нашел ничего лаконичного и полезного в Google.
ОБНОВИТЬ
Я добавил триггер после вставки, который создал новую запись в таблице отчетов, если запись не существует, а затем обновил столбцы с помощью оператора обновления.
update table set field=value+delta where condition
. Базе данных это не понравилось, и приложение - java, hibernate -, которое отправляло данные, тоже не выдержало и начало генерировать исключения:
- Это совершенно не имеет отношения к строке, которую спящий режим пытался вставить, потому что он не пытался обновить. Очевидно, это исходит от триггера MariaDB:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
- Я не уверен, почему это произошло, но я тоже кое-что получил:
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
1 ответ
Я обнаружил, что триггер не был потокобезопасным в том смысле, что база данных начала выдавать разные ошибки для одновременного обновления одной и той же строки:
- Строка была обновлена или удалена другой транзакцией (или сопоставление несохраненных значений было неправильным)
- Обнаружен тупик при попытке получить блокировку; попробуйте перезапустить транзакцию
Я попытался ввести блокировку на уровне строк, но это вообще не сработало. Я считаю, что блокировка игнорировалась или строки вообще не блокировались
$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
- Автофиксация отключена
- Изоляция транзакции была изменена на фиксацию чтения
- Пробовала блокировку на уровне строк с помощью
SELECT what FROM tables WHERE conditions FOR UPDATE
с использованием первичных ключей
Пробовал эквивалентное решение с блокировкой на уровне таблицы с сохранением данных с помощью одного потока, но оно не могло справиться с объемом данных, который у меня был.
Решение, к которому я пришел, - это разделение на уровне потоков обработки фида от персистентности таким образом, что несколько потоков обрабатывают входящий фид данных и создают объекты сущностей для другого набора потоков, чтобы сохранить их в базе данных. Это позволяет мне экспериментировать и находить оптимальное количество потоков в каждой области для моей платформы, как и в настоящее время, я тестирую с 8 потоками, обрабатывающими входящий поток и создающими объекты сущностей для еще 4 потоков, которые отвечают за их сохранение в базе данных. Для постоянных потоков я ввел некоторую интеллектуальную сегрегацию и настраиваемую блокировку набора сущностей на уровне приложения, чтобы гарантировать, что никакие два потока не будут пытаться обновить одну и ту же строку одновременно. Кажется, это работает, теперь мне просто нужно найти правильное количество потоков для обеих областей.
Это потребительский класс, который создает отставание для писателей БД.
protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);
private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
persister.saveAll(dao, entry.getKey(), entry.getValue());
}
}
Это отставание для писателей БД
private LinkedBlockingQueue<Key> keys;
private Map<Key, Set> backlog;
public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
Key<ENTITY> key = Key.get(dao, bucket);
synchronized (key) {
synchronized (backlog) {
if (backlog.containsKey(key)) {
backlog.get(key).addAll(entitySet);
} else {
backlog.put(key, entitySet);
try {
keys.put(key);
} catch (InterruptedException ex) {
}
}
}
}
}
Это ядро писателя БД
private void processDBBatchUpdate(Key key) {
synchronized (key) {
Set set;
synchronized (backlog) {
set = backlog.remove(key);
}
key.getDao().saveAll(set);
}
}
Это ключевой класс для блокировки
private IDefaultEntityDAO<ENTITY> dao;
private String bucket;
private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();
private Key(IDefaultEntityDAO dao, String bucket) {
this.dao = dao;
this.bucket = bucket;
}
public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
if (!keys.containsKey(dao)) {
keys.put(dao, new HashMap<>());
}
if (!keys.get(dao).containsKey(bucket)) {
keys.get(dao).put(bucket, new Key(dao, bucket));
}
return keys.get(dao).get(bucket);
}