Выполнение загрузки чтения на Кассандре с получением BusyPoolException

Я пытаюсь выполнить большое количество запросов с использованием Phantom версии 2.14.1, например:

case class Foo(id: String, x: Long, y: Long)
val list: List[Foo] = _
list.size = 100000
def find(id: String, x: Long, y:Long )
    select
      .where(_.id eqs id)
      .and(_.ts >= x)
      .and(_.ts < y)
      .fetch()
  }

list.map(f => find(f.id, f.x, f.y)

Я получаю это исключение:

[pool-2-thread-91] ERROR com.outworkers.phantom - Failed to execute query SELECT * FROM my_table WHERE id = 'some_uuid' AND x >= 1503501104 AND y < 1503501224;
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/0:0:0:0:0:0:0:1] Pool is busy (no available connection and the queue has reached its max size 256)))
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:220)
    at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:291)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:358)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:313)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:283)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:98)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    at com.outworkers.phantom.ScalaGuavaAdapter$.statementToPromise(ScalaGuavaAdapter.scala:70)
    at com.outworkers.phantom.ScalaGuavaAdapter$.statementToFuture(ScalaGuavaAdapter.scala:32)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:90)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
    at com.outworkers.phantom.builder.query.execution.GuavaAdapter$class.fromGuava(ExecutableStatements.scala:44)
    at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
    at com.outworkers.phantom.builder.query.execution.QueryInterface.future(QueryInterface.scala:71)
    at com.outworkers.phantom.builder.query.execution.ResultQueryInterface.fetch(ResultQueryInterface.scala:131)

Я попытался настроить контактные точки, как это было предложено, но кажется, что любое число, которое я добавляю в конфигурацию, не оказывает влияния

val connector = ContactPoint.LOCAL.withClusterBuilder( ).withoutJMXReporting().withoutMetrics().withPoolingOptions(
      new PoolingOptions()
        .setMaxConnectionsPerHost(HostDistance.LOCAL, 1)
        .setMaxConnectionsPerHost(HostDistance.REMOTE, 2)
        .setMaxRequestsPerConnection(HostDistance.LOCAL,100)
        .setMaxRequestsPerConnection(HostDistance.REMOTE,200)
    )
  ).keySpace(KeySpaceSerializer(keyspace).ifNotExists().`with`(replication eqs SimpleStrategy.replication_factor(1))
    .and(durable_writes eqs true))

1 ответ

Решение

Я не думаю, что то, что вы видите здесь, связано с потоковой передачей как таковой, проблема в том, что большой объем запросов монополизирует внутренний пул потоков Java-драйвера Datastax, поэтому в основном в драйвере заканчиваются параллельные соединения с Cassandra.

Это в некоторой степени настраивается.

PoolingOptions.setMaxRequestsPerConnection(HostDistance, int): maximum number of requests per connection;

PoolingOptions.setMaxConnectionsPerHost(HostDistance, int): maximum number of connections in the pool;

PoolingOptions.setMaxQueueSize(int): maximum number of enqueued requests before this exception is thrown.

Вы бы установили их через ClusterBuilder как это.

  val connector = ContactPoint.local
    .noHeartbeat()
    .withClusterBuilder(_.withoutJMXReporting()
      .withoutMetrics().withPoolingOptions(
        new PoolingOptions()
          .setMaxConnectionsPerHost(HostDistance.LOCAL, 15))
          .setMaxRequestsPerConnection(100)
    ).keySpace(KeySpaceSerializer(space).ifNotExists()
      .`with`(replication eqs SimpleStrategy.replication_factor(1))
      .and(durable_writes eqs true)
    )

На самом деле я не проверил правильность всех значений, просто показывая вам конфигурацию DSL.

Другие вопросы по тегам