Datstax Cassandra Навальный загрузчик
Я пытаюсь использовать JMX BulkLoader для передачи данных ETL в Cassandra с удаленного узла на кластер
Однако после успешного установления соединения JMX, похоже, не удается выполнить массовую загрузку.
Обратите внимание, что основная нагрузка передается с удаленного узла в кластер cassandra.
Это выглядит так, как будто кажется, что он будет работать в локальном кластере кассандры (т.е. от локального хоста к кластеру кассандры)
Я что-то здесь упускаю? Может кто-нибудь совет
Исключение ниже
java.lang.IllegalArgumentException: недопустимый каталог /XXXXXXXXX в org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970) в org.apache.cassandra.service.StorageService.ynava: atj3939.reflect.GeneratedMethodAccessor21.invoke(Неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke(Method.javac.mpoline) at6..invoke(MethodUtil.java:75) в sun.reflect.GeneratedMethodAccessor2.invoke(Неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.redjvo.ke 606 invokeM2(StandardMBeanIntrospector.java:46) на com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237) на com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138) на com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java) atj.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) в com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.
class JmxBulkLoader(host: String, port: Int) {
private var connector: JMXConnector = _
private var storageBean: StorageServiceMBean = _
private var timer: Timer = new Timer()
connect("http://hostip , 7199)
private def connect(host: String, port: Int) {
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,
port))
Logger.info(" Connected to JMX Entity " + jmxUrl)
val env = new HashMap[String, Any]()
connector = JMXConnectorFactory.connect(jmxUrl, env)
val mbeanServerConn = connector.getMBeanServerConnection
val name = new ObjectName("org.apache.cassandra.db:type=StorageService")
storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])
}
def close() {
connector.close()
}
def bulkLoad(path: String): Boolean = {
try {
val timer = new Stopwatch().start
val result = storageBean.bulkLoadAsync(path)
timer.stop
Logger.info("Async Result of Bulk Load " + result)
Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")
true
} catch {
case e: Exception =>
Logger.error("Error in Bulk Loading " + e.printStackTrace())
false
}
}
}
2 ответа
Это выглядит так, как будто кажется, что он будет работать в локальном кластере кассандры (т.е. от локального хоста к кластеру кассандры)
Не совсем. Но подумайте об этом: вы вызываете функцию mbean узла Cassandra со строковым параметром. Этот вызов выполняется процессом Cassandra, который вы вызываете (т.е. подключаетесь к нему). Параметр указывает путь на стороне узла, к которому вы подключаетесь.
Вы должны убедиться, что путь к цели существует и содержит ожидаемые данные (например, через общее хранилище или предварительно скопировав файлы).
- Таблица должна существовать в Кассандре
- Каталог должен быть доступен (локально) для узла Кассандра.
- Каталог должен заканчиваться пространством ключей и именем целевой таблицы:
/some_path/$KeySpaceName/$TableName