Datstax Cassandra Навальный загрузчик

Я пытаюсь использовать JMX BulkLoader для передачи данных ETL в Cassandra с удаленного узла на кластер

https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java

Однако после успешного установления соединения JMX, похоже, не удается выполнить массовую загрузку.

Обратите внимание, что основная нагрузка передается с удаленного узла в кластер cassandra.

Это выглядит так, как будто кажется, что он будет работать в локальном кластере кассандры (т.е. от локального хоста к кластеру кассандры)

Я что-то здесь упускаю? Может кто-нибудь совет

Исключение ниже

java.lang.IllegalArgumentException: недопустимый каталог /XXXXXXXXX в org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970) в org.apache.cassandra.service.StorageService.ynava: atj3939.reflect.GeneratedMethodAccessor21.invoke(Неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke(Method.javac.mpoline) at6..invoke(MethodUtil.java:75) в sun.reflect.GeneratedMethodAccessor2.invoke(Неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.redjvo.ke 606 invokeM2(StandardMBeanIntrospector.java:46) на com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237) на com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138) на com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java) atj.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) в com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.

class JmxBulkLoader(host: String, port: Int) {

  private var connector: JMXConnector = _

  private var storageBean: StorageServiceMBean = _

  private var timer: Timer = new Timer()

  connect("http://hostip , 7199)

 private def connect(host: String, port: Int) {

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,

      port))

    Logger.info(" Connected to JMX Entity " + jmxUrl)

    val env = new HashMap[String, Any]()

    connector = JMXConnectorFactory.connect(jmxUrl, env)

    val mbeanServerConn = connector.getMBeanServerConnection

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService")

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])

  }

  def close() {

    connector.close()

  }

  def bulkLoad(path: String): Boolean = {

    try {

      val timer = new Stopwatch().start

      val result = storageBean.bulkLoadAsync(path)

      timer.stop

      Logger.info("Async Result of Bulk Load " + result)

      Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")

      true

    } catch {

      case e: Exception =>

        Logger.error("Error in Bulk Loading " + e.printStackTrace())

        false

    }

  }

}

2 ответа

Это выглядит так, как будто кажется, что он будет работать в локальном кластере кассандры (т.е. от локального хоста к кластеру кассандры)

Не совсем. Но подумайте об этом: вы вызываете функцию mbean узла Cassandra со строковым параметром. Этот вызов выполняется процессом Cassandra, который вы вызываете (т.е. подключаетесь к нему). Параметр указывает путь на стороне узла, к которому вы подключаетесь.

Вы должны убедиться, что путь к цели существует и содержит ожидаемые данные (например, через общее хранилище или предварительно скопировав файлы).

  1. Таблица должна существовать в Кассандре
  2. Каталог должен быть доступен (локально) для узла Кассандра.
  3. Каталог должен заканчиваться пространством ключей и именем целевой таблицы:
    /some_path/$KeySpaceName/$TableName
Другие вопросы по тегам