Как правильно обрабатывать два потока, обновляя одну и ту же строку в базе данных
У меня есть тема под названием T1
для чтения плоского файла и его анализа. Мне нужно создать новую тему под названием T2
для разбора некоторой части этого файла и позже этого T2
поток должен будет обновить статус исходного объекта, который также анализируется и обновляется исходным потоком. T1
. Как я могу справиться с этой ситуацией?
Я получаю плоский файл с примерами записей ниже:
AAAA
BBBB
AACC
BBCC
AADD
BBDD
Сначала этот файл сохраняется в базе данных в Received
статус. Теперь все записи начинаются с BB
или с AA
нужно обрабатывать в отдельном потоке. После успешного анализа оба потока попытаются обновить состояние этого файлового объекта в базе данных до Parsed
, В некоторых случаях я получаю staleObjectException
, Изменить: И работа, проделанная любым потоком, прежде чем исключение будет потеряно. Мы используем оптимистическую блокировку. Каков наилучший способ избежать этой проблемы?
Возможные исключения гибернации, когда два потока обновляют один и тот же объект?
Вышеприведенный пост помогает понять некоторую его часть, но не помогает решить мою проблему.
3 ответа
Часть 1 - Ваша проблема - так, как я ее вижу.
Основная причина получения этого исключения заключается в том, что вы используете Hibernate с возможно оптимистичной блокировкой. Это в основном говорит вам, что либо поток T1, либо поток T2 уже обновили состояние до PARSED, и теперь другой поток удерживает старую версию строки с меньшей версией, чем в базе данных, и пытается также обновить состояние до PARSED.,
Большой вопрос здесь: " Два потока пытаются сохранить одинаковые данные?". Если ответ на этот вопрос положительный, то даже если последнее обновление прошло успешно, проблем не должно быть, потому что в конечном итоге они обновляют строку до того же состояния. В этом случае вам не нужна оптимистическая блокировка, потому что ваши данные в любом случае будут синхронизированы.
Основная проблема возникает, если после состояния установлено значение RECIEVED, если два потока T1 и T2 фактически зависят друг от друга при сбросе до следующего состояния. В этом случае вам необходимо убедиться, что если T1 был выполнен первым (или наоборот), T2 необходимо обновить данные для обновленной строки и повторно применить свои изменения на основе изменений, уже выдвинутых T1. В этом случае решение заключается в следующем. Если вы сталкиваетесь с staleObjectException, вам, в основном, нужно обновить данные из базы данных и перезапустить свою работу.
Часть 2 Анализ опубликованной ссылки Возможные исключения гибернации, когда два потока обновляют один и тот же объект? Подход 1, это более или менее последний, чтобы обновить ситуацию Wins. Это более или менее позволяет избежать оптимистической блокировки (подсчета версий). В случае, если у вас нет зависимости от T1 до T2 или наоборот, чтобы установить статус PARSED. Это должно быть хорошо.
**** Aproach 2 ** Оптимистическая блокировка ** Это то, что у вас есть сейчас. Решение состоит в том, чтобы обновить данные и возобновить работу.
Aproach 3 Блокировка БД на уровне строк Решение здесь более или менее такое же, как и для подхода 2, с небольшой коррекцией, которую выдерживает пессимистическая блокировка. Основное отличие состоит в том, что в этом случае это может быть блокировка READ, и вы даже не сможете прочитать данные из базы данных, чтобы обновить их, если это PESSIMISTIC READ.
Синхронизация на уровне приложений Aproach 4 Существует много разных способов сделать синхронизацию. Одним из примеров может быть фактическое расположение всех ваших обновлений в очереди BlockingQueue или JMS (если вы хотите, чтобы они были постоянными) и загрузка всех обновлений из одного потока. Чтобы визуализировать это немного, T1 и T2 будут помещать элементы в очередь, и будут выполняться операции чтения одного потока T3 и их передача на сервер базы данных.
Если вы используете синхронизацию на уровне приложений, вы должны знать, что не все структуры могут быть распределены при развертывании на нескольких серверах.
Ну, пока я не могу думать ни о чем другом:)
Я не уверен, что понимаю вопрос, но кажется, что это будет логическая ошибка для потока T1, который обрабатывает только, например, записи, начинающиеся с AA, чтобы пометить весь файл как "Parsed"? Что произойдет, если, например, ваше приложение аварийно завершит работу после обновлений T1, но пока T2 все еще обрабатывает записи BB? Некоторые записи BB могут быть потеряны, верно?
В любом случае, суть проблемы в том, что у вас есть состояние гонки с двумя потоками, обновляющими один и тот же объект. Исключение устаревшего объекта просто означает, что один из ваших потоков проиграл гонку. Лучшее решение полностью избегает гонки.
(Здесь я предполагаю, что обработка отдельной записи идемпотентна, если это не так, я думаю, что у вас есть большие проблемы, так как некоторые режимы сбоя приведут к повторной обработке записей. Если обработка записи должна произойти один раз и только один раз, то вы есть более сложная проблема, для которой очередь сообщений, вероятно, будет лучшим решением.)
Я бы использовал функциональность java.util.concurrent для отправки записей потоковым работникам, и чтобы поток взаимодействовал с блоком гибернации до тех пор, пока не будут обработаны все записи, после чего этот поток может пометить файл как "Parsed".
Например,
// do something like this during initialization, or use a Guava LoadingCache...
Map<RecordType, Executor> executors = new HashMap<>();
// note I'm assuming RecordType looks like an enum
executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor());
затем, когда вы обрабатываете файл, вы отправляете каждую запись следующим образом, формируя список фьючерсов, соответствующих состоянию поставленных в очередь задач. Давайте предположим, что при успешной обработке записи возвращается логическое "истина":
List<Future<Boolean>> tasks = new ArrayList<>();
for (Record record: file.getRecords()) {
Executor executorForRecord = executors.get(record.getRecordType());
tasks.add(executor.submit(new RecordProcessor(record)));
}
Теперь дождитесь успешного завершения всех задач - есть более элегантные способы сделать это, особенно с Guava. Обратите внимание, что вам также нужно иметь дело с ExecutionException, если ваша задача не выполнена, за исключением, я здесь заостряю внимание.
boolean allSuccess = true;
for (Future<Boolean> task: tasks) {
allSuccess = allSuccess && task.get();
if (!allSuccess) break;
}
// if all your tasks completed successfully, update the file record
if (allSuccess) {
file.setStatus("Parsed");
}
Предполагая, что каждый поток T1,T2 будет анализировать различные части файла, это означает, что никто не отменяет анализ другого потока. Лучше всего отделить процесс разбора от коммита БД.
T1,T2 выполнит синтаксический анализ T3 или Main Thread выполнит коммит после того, как оба T1,T2 закончили. и я думаю, что в этом подходе правильнее изменить статус файла на Parsed
только когда оба потока закончили.
вы можете думать о T3 как о классе CommitService, который ждет, пока T1,T2 не завершит, а затем передаст в БД
CountDownLatch - полезный инструмент для этого. и вот пример