Работа с NoHostAvailableException с фантомным DSL

При попытке вставить несколько тысяч записей одновременно в удаленную базу данных Cassandra, я воспроизводимо сталкиваюсь с таймаутами (с 5-6 тысячами элементов на медленном соединении)

ошибка:

All host(s) tried for query failed (tried: /...:9042
(com.datastax.driver.core.exceptions.OperationTimedOutException: [/...]
Timed out waiting for server response))
com.datastax.driver.core.exceptions.NoHostAvailableException: 
All host(s) tried for query failed (tried: /...:9042
(com.datastax.driver.core.exceptions.OperationTimedOutException: [/...]
Timed out waiting for server response))

модель:

class RecordModel extends CassandraTable[ConcreteRecordModel, Record] {

  object id extends StringColumn(this) with PartitionKey[String]

...
abstract class ConcreteRecordModel extends RecordModel 
    with RootConnector with ResultSetFutureHelper {

def store(rec: Record): Future[ResultSet] = 
    insert.value(_.id, rec.id).value(...).future()

def store(recs: List[Record]): Future[List[ResultSet]] = Future.traverse(recs)(store)

разъем:

val connector = ContactPoints(hosts).withClusterBuilder(
  _.withCredentials(
    config.getString("username"),
    config.getString("password")
  ).withPoolingOptions(
    new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 4)
      .setMaxConnectionsPerHost(HostDistance.LOCAL, 10)
      .setCoreConnectionsPerHost(HostDistance.REMOTE, 2)
      .setMaxConnectionsPerHost(HostDistance.REMOTE, 4)
      .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
      .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000)
      .setPoolTimeoutMillis(10000)
  )
).keySpace(keyspace)

Я попытался настроить варианты объединения, отдельно и вместе. Но даже удваивая все REMOTE настройки не изменили время ожидания заметно

текущий обходной путь, которого я хотел бы избежать - разбить список на партии и дождаться завершения каждого:

def store(recs: List[Record]): Future[List[ResultSet]] = {
  val rs: Iterator[List[ResultSet]] = recs.grouped(1000) map { slice =>
    Await.result(Future.traverse(slice)(store), 100 seconds)
  }
  Future.successful(rs.to[List].flatten)
}

Что было бы хорошим способом справиться с этой проблемой?

Спасибо

РЕДАКТИРОВАТЬ

Ошибки предполагают сбой / перегрузку кластера, но я подозреваю, что сеть играет здесь важную роль. Числа, приведенные выше, взяты с удаленного компьютера. Они НАМНОГО выше, когда тот же C* подается с машины в том же центре обработки данных. Другая подозрительная деталь заключается в том, что при заполнении одного экземпляра C* одним пером не возникает проблем с тайм-аутом, удаленным или нет.

Что мне действительно не нравится в регулировании, так это то, что размеры пакетов являются случайными и статичными, в то время как они должны быть адаптируемыми.

1 ответ

Похоже, вы попали в пределы своего кластера. Если вы хотите избежать тайм-аутов, вам нужно будет увеличить емкость, чтобы справиться с нагрузкой. Если вы хотите просто выполнить пакетную запись, вам следует ограничить их (как вы это делаете), так как отправка слишком большого количества запросов на слишком мало узлов снизит производительность. Вы также можете увеличить тайм-ауты на стороне сервера (read_request_timeout_in_ms, write_request_timeout_in_ms, request_timeout_in_ms), если вы хотите подождать, пока вы сможете писать, однако это не рекомендуется, поскольку вы не дадите Кассандре время на восстановление и, вероятно, вызовете большое количество ParNew GC.

Другие вопросы по тегам