Как Cassandra обрабатывает блокировку оператора execute в драйвере Java datastax
Блокирование выполнения fethod из com.datastax.driver.core.Session
public ResultSet execute(Statement statement);
Прокомментируйте этот метод:
Этот метод блокируется до тех пор, пока из базы данных не будет получен хотя бы какой-либо результат. Однако для запросов SELECT это не гарантирует, что результат был получен полностью. Но это гарантирует, что какой-то ответ был получен из базы данных, и, в частности, гарантирует, что, если запрос недействителен, этим методом будет сгенерировано исключение.
Неблокируемый метод выполнения из com.datastax.driver.core.Session
public ResultSetFuture executeAsync(Statement statement);
Этот метод не блокирует. Он возвращается, как только запрос был передан в базовый сетевой стек. В частности, возврат из этого метода не гарантирует, что запрос действителен или даже был передан в действующий узел. Любое исключение, относящееся к сбою запроса, будет выдано при доступе к {@link ResultSetFuture}.
У меня есть 02 вопроса о них, поэтому было бы здорово, если бы вы могли помочь мне понять их.
Допустим, у меня есть 1 миллион записей, и я хочу, чтобы все они были внесены в базу данных (без потерь).
Вопрос 1: Если у меня n потоков, у всех потоков будет одинаковое количество записей, которые они должны отправить в базу данных. Все они продолжают отправлять несколько запросов на вставку в cassandra, используя блокировку вызова execute. Если я увеличу значение n, поможет ли это также ускорить время, необходимое для вставки всех записей в кассандру?
Не вызовет ли это проблемы с производительностью для Кассандры? Должна ли Кассандра удостовериться, что для каждой отдельной записи вставки все узлы в кластерах должны немедленно узнать о новой записи? В целях обеспечения согласованности в данных. (Я предполагаю, что узел Кассандры даже не будет думать об использовании времени локальной машины для управления временем вставки записи).
Вопрос 2: Как выполнить неблокирование, как я могу убедиться, что все вставки выполнены успешно? Единственный известный мне способ - ждать, пока ResultSetFuture проверит выполнение запроса на вставку. Есть ли лучший способ, которым я могу сделать? Есть ли более высокая вероятность того, что неблокирующее выполнение легче завершить, чем блокирующее выполнение?
Большое спасибо за вашу помощь.
2 ответа
Если у меня n потоков, все потоки будут иметь одинаковое количество записей, которое нужно отправить в базу данных. Все они продолжают отправлять несколько запросов на вставку в cassandra, используя блокировку вызова execute. Если я увеличу значение n, поможет ли это также ускорить время, необходимое для вставки всех записей в кассандру?
До некоторой степени Давайте немного разберем детали реализации клиента и посмотрим на вещи с точки зрения "Количество одновременных запросов", так как вам не нужно иметь поток для каждого текущего запроса, если вы используете executeAsync. В ходе моего тестирования я обнаружил, что, хотя большое количество одновременных запросов имеет большое значение, существует порог, для которого наблюдается снижение отдачи или снижение производительности. Мое общее правило (number of Nodes *
native_transport_max_threads (default: 128)
* 2)
, но вы можете найти более оптимальные результаты с большим или меньшим.
Идея заключается в том, что ставить в очередь больше запросов, чем манипулирует cassandra за один раз, не так много. Сокращая количество запросов в полете, вы ограничиваете ненужную перегрузку в соединениях между вашим драйвером-клиентом и cassandra.
Вопрос 2: Как выполнить неблокирование, как я могу убедиться, что все вставки выполнены успешно? Единственный известный мне способ - ждать, пока ResultSetFuture проверит выполнение запроса на вставку. Есть ли лучший способ, которым я могу сделать? Есть ли более высокая вероятность того, что неблокирующее выполнение легче завершить, чем блокирующее выполнение?
Ожидание ResultSetFuture через get
это один маршрут, но если вы разрабатываете полностью асинхронное приложение, вы хотите избежать максимально возможной блокировки. Используя гуаву, ваши два лучших оружия Futures.addCallback
а также Futures.transform
,
Futures.addCallback
позволяет зарегистрироватьFutureCallback
это выполняется, когда водитель получил ответ.onSuccess
исполняется в случае успеха,onFailure
иначе.Futures.transform
позволяет эффективно отобразить возвращенныйResultSetFuture
во что-то еще. Например, если вам нужно только значение 1 столбца, вы можете использовать его для преобразованияListenableFuture<ResultSet>
кListenableFuture<String>
без необходимости блокировать ваш код наResultSetFuture
а затем получить значение String.
В контексте написания программы для загрузки данных вы можете сделать что-то вроде следующего:
- Для простоты используйте
Semaphore
или какой-либо другой конструкции с фиксированным количеством разрешений (это будет ваше максимальное количество запросов в полете). Всякий раз, когда вы идете, чтобы отправить запрос, используяexecuteAsync
, получить разрешение. Вам действительно нужен только 1 поток (но может потребоваться ввести пул с размером ядра # CPU, который это делает), который получает разрешения от семафора и выполняет запросы. Он будет блокироваться при получении, пока не будет доступного разрешения. - использование
Futures.addCallback
на будущее вернулся изexecuteAsync
, Обратный звонок должен позвонитьSempahore.release()
в обоихonSuccess
а такжеonFailure
случаев. Выпуская разрешение, это должно позволить вашей ветке на шаге 1 продолжить и отправить следующий запрос.
Для дальнейшего повышения пропускной способности, вы можете рассмотреть возможность использования BatchStatement
и отправка запросов партиями. Это хороший вариант, если вы сохраняете свои партии небольшими (50-250 - это хорошее число), и если все ваши вставки в пакете имеют один и тот же ключ разделения.
Помимо приведенного выше ответа,
Похоже, execute () вызывает метод executeAsync(оператор).getUninterruptibly(), поэтому независимо от того, управляете ли вы своим собственным "пулом потоков" с помощью execute () и блокируете себя до тех пор, пока выполнение не завершится максимум из n запущенных потоков, ИЛИ с помощью executeAsync() на для всех записей производительность на стороне кассандры должна быть примерно одинаковой, в зависимости от времени выполнения / количества + таймаутов.
Все эти исполнения будут запускать соединения, заимствованные из пула, каждое выполнение имеет streamId на стороне клиента и уведомляет вас через будущее, когда ответ возвращается для этого streamId, ограниченного общим количеством запросов на соединение на стороне клиента и общим количеством запросов, ограниченным потоками чтения на каждом узле, выбранном для выполнения вашего запроса, любое большее число будет помещено в буфер в очереди (не заблокированной), ограниченной ограничением соединения maxQueueSize и maxRequestsPerConnection, любое большее, чем это, должно завершиться сбоем. Прелесть этого в том, что executeAsync() не запускается в новом потоке для каждого запроса / выполнения.
Таким образом, должно быть ограничение на количество запросов, которые могут быть выполнены с помощью execute () или executeAsync(), в execute () вы избегаете превышения этих пределов.
С точки зрения производительности вы начнете видеть штраф, превышающий то, что может обрабатывать каждый узел, так что execute () с пулом хорошего размера имеет смысл для меня. Более того, используйте реактивную архитектуру, чтобы избежать создания большого количества потоков, которые ничего не делают, кроме ожидания, поэтому большое количество потоков приведет к бесполезному переключению контекста на стороне клиента. Для меньшего числа запросов executeAsync() будет лучше, избегая пулов потоков.
DefaultResultSetFuture future = new DefaultResultSetFuture (..., makeRequestMessage (Statement, null));
новый RequestHandler(это, будущее, заявление).sendRequest();