Как регулировать запись запроса на кассандру при работе с "executeAsync"?

Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, и моя версия кластера cassandra - 2.0.10. Я пишу асинхронно с последовательностью QUORUM.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

Мой вышеупомянутый метод сохранения будет вызываться из нескольких потоков с очень высокой скоростью.

Вопрос:

Я хочу душить просьбу executeAsync метод, который пишет асинхронно в Кассандру. Если я пишу с очень высокой скоростью, чем может справиться мой кластер Cassandra, то он начнет выдавать ошибки, и я хочу, чтобы все мои записи успешно проходили в cassandra без каких-либо потерь.

Я видел этот пост, где решение использовать Semaphore с фиксированным количеством разрешений. Но я не уверен, как и как лучше всего это осуществить. Я никогда не использовал Semaphor раньше. Это логика. Может ли кто-нибудь предоставить пример с семафорной основой в моем коде или, если есть какой-нибудь лучший способ / вариант, то дайте мне знать.

В контексте написания программы для загрузки данных вы можете сделать что-то вроде следующего:

  • Для простоты используйте семафор или другую конструкцию с фиксированным количеством разрешений (это будет вашим максимальным количеством внутренних запросов). Всякий раз, когда вы отправляете запрос, используя executeAsync, приобретайте разрешение. Вам действительно нужен только 1 поток (но может потребоваться ввести пул с размером ядра # CPU, который это делает), который получает разрешения от семафора и выполняет запросы. Он будет блокироваться при получении, пока не будет доступного разрешения.
  • Используйте Futures.addCallback для будущего, возвращенного от executeAsync. Обратный вызов должен вызывать Sempahore.release() в случаях onSuccess и onFailure. Выпуская разрешение, это должно позволить вашей ветке на шаге 1 продолжить и отправить следующий запрос.

Также я видел пару других постов, где они говорили об использовании RingBuffer или же Guava RateLimitter так какой из них лучше, и я должен использовать? Ниже приведены варианты, которые я могу придумать:

  • Использование семафора
  • Использование кольцевого буфера
  • Использование ограничителя скорости гуавы

Может ли кто-нибудь помочь мне с примером того, как мы можем ограничить запрос или получить противодавление для записей Кассандры и убедиться, что все записи успешно проходят в Кассандру?

2 ответа

Решение

Не авторитетный ответ, но, возможно, это будет полезно. Сначала вы должны подумать, что бы вы сделали, если запрос не может быть выполнен сразу же. Независимо от того, какое ограничение скорости вы выбрали, если вы получаете запросы с большей скоростью, чем вы можете написать в Cassandra, в конечном итоге ваш процесс будет забит ожидающими запросами. И в этот момент вам нужно будет сказать своим клиентам, чтобы они на некоторое время задержали свои запросы ("отодвинуть"). Например, если они приходят через HTTP, то статус ответа будет 429 "Слишком много запросов". Если вы генерируете запросы в одном и том же процессе, решите, какое время ожидания допустимо. Тем не менее, если Кассандра не может идти в ногу, то пришло время масштабировать (или настраивать) это.

Может быть, прежде чем вводить ограничения скорости, стоит поэкспериментировать и добавить искусственные задержки в свои потоки перед вызовом save метод (используя Thread.sleep(...)) и посмотрите, решит ли он вашу проблему или что-то еще нужно.

Ошибка возврата запроса - обратное давление от Кассандры. Но вы можете выбрать или реализовать RetryPolicy, чтобы определить, когда повторять неудачные запросы.

Также вы можете посмотреть параметры пула соединений (и особенно мониторинг и настройку пула). Можно настроить количество асинхронных запросов на соединение. Однако в документации сказано, что для Cassandra 2.x этот параметр ограничен 128, и его не следует менять (хотя я бы поэкспериментировал с этим:)

Реализация с семафором выглядит так

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(В реальном коде я бы создал обратный вызов оболочки, который бы высвобождал разрешения, а затем вызывал бы упакованные методы)

RateLimiter Guava похож на семафор, но допускает временные всплески после периодов недоиспользования и ограничивает запросы на основе времени (не общее количество активных запросов).

Однако запросы все равно не будут выполнены по разным причинам, поэтому, вероятно, лучше иметь план, как повторить их (в случае периодических ошибок).

Это может быть неуместно в вашем случае, но я бы попытался использовать некоторую очередь или буфер для постановки в очередь запросов (например, java.util.concurrent.ArrayBlockingQueue). "Буфер заполнен" означает, что клиенты должны ждать или отказаться от запроса. Буфер также будет использоваться для повторной постановки неудачных запросов. Однако, чтобы быть более справедливыми, неудавшиеся запросы, вероятно, должны быть помещены в начало очереди, чтобы они повторялись в первую очередь. Также нужно как-то обрабатывать ситуацию, когда очередь заполнена и одновременно появляются новые неудачные запросы. Затем однопоточный рабочий выбирает запросы из очереди и отправляет их Кассандре. Поскольку он не должен делать много, маловероятно, что он станет узким местом. Этот работник также может применять свои собственные ограничения скорости, например, основанные на com.google.common.util.concurrent.RateLimiter,

Если кто-то хочет избежать как можно большей потери сообщений, он может поставить посредника сообщений с настойчивостью (например, Кафку) перед Кассандрой. Таким образом, входящие сообщения могут пережить даже длительные перебои с Кассандрой. Но, полагаю, в вашем случае это слишком.

Простое использование очереди блокировки должно делать это нормально. Фьючерсы являются многопоточными, и там обратный вызов (успех и неудача) будет действовать как потребитель, и где бы вы ни вызывали метод save, он будет действовать как производитель.

Еще лучше, вы помещаете полный запрос в очередь и удаляете его по очереди, сохраняя при каждой очереди.

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

Если вы хотите пойти дальше и проверить нагрузку на кластер в середине пути

public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}
Другие вопросы по тегам