Dse\Exception\RuntimeException: все соединения во всех потоках ввода-вывода заняты

В нашем веб-приложении есть средство для удаления большого количества данных. Мы делаем это, разбивая на страницы все записи, найденные против u_id,

Ключи, которые мы имеем, предназначены для других запросов, которые у нас есть в приложении - в идеале было бы здорово иметь первичный ключ для u_id но это сломало бы все наши другие запросы.

Приведенный ниже метод работает в большинстве случаев, однако после удаления примерно 6-8 миллионов записей мы получаем:

Dse \ Exception \ RuntimeException: все соединения во всех потоках ввода-вывода заняты

Мы также иногда получаем немного другое сообщение об ошибке:

Dse \ Exception \ ReadTimeoutException: истекло время ожидания операции - получено только 0 ответов

Вы заметите в коде ниже usleep(2500000) который останавливает сценарий. Это был наш обходной путь, но было бы хорошо решить эту проблему, так как Кассандра должна быть в состоянии справиться с таким количеством удалений.

$cluster        = \Dse::cluster()
                    ->withDefaultTimeout(3600)
                      ->withContactPoints(env('CA_HOST'))
                        ->build();

$session        = $cluster->connect(env('CONNECT'));
$options        = array('page_size' => 50);
$results        = $session->execute("SELECT * FROM datastore WHERE u_id = $u_id;", $options);
$future_deletes = array();

while (true) {

    foreach ($results as $result) {

      $future_deletes[] = $session->executeAsync("DELETE FROM datastore WHERE record_id = '" . $result['record_id'] . "' AND record_version = " . $result['record_version'] . " AND user_id = " . $result['user_id']);
      $future_deletes[] = $session->executeAsync("UPDATE data_count set u_count = u_count - 1 WHERE u_id = " . $u_id);

    }

    if( !empty($future_deletes) ){
      foreach ($future_deletes as $future_delete) {
          // we will not wait for each result for more than 5 seconds
          $future_delete->get(5);
      }
      //usleep(2500000); //2.5 seconds
    }

    $future_deletes = array();

    if ($results->isLastPage()) {
        break;
    }

    $results = $results->nextPage();

}

//Disconnect
$session = NULL;

Для справки, вот наши таблицы:

CREATE TABLE datastore (id uuid,
    record_id varchar,
    record_version int,
    user_id int,
    u_id int,
    column_1 varchar,
    column_2 varchar,
    column_3 varchar,
    column_4 varchar,
    column_5 varchar,
PRIMARY KEY((record_id), record_version, user_id)
);
CREATE INDEX u_id ON datastore (u_id);

CREATE TABLE data_count (u_id int PRIMARY KEY, u_count counter);

У нас работает сервер с 8 ГБ ОЗУ.

Версия драйвера DSE - 6.0.1.

Заранее спасибо!

1 ответ

Вы должны контролировать, сколько запросов в полете у вас в один и тот же момент времени. Существует ограничение на количество запросов на соединение и количество соединений. Они управляются соответствующими функциями класса Cluster (не может найти достаточно быстро в документах PHP, но он должен быть похож на функции Cluster в драйвере C++, потому что PHP построен поверх драйвера C++).

Другие вопросы по тегам