Как правильно получить доступ к dbutils в Scala при использовании Databricks Connect
Я использую Databricks Connect для локального запуска кода в моем кластере Azure Databricks из IntelliJ IDEA (Scala).
Все нормально работает. Я могу подключаться, отлаживать, проверять локально в среде IDE.
Я создал задание Databricks для запуска своего пользовательского JAR-файла приложения, но оно не выполняется из-за следующего исключения:
19/08/17 19:20:26 ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: com/databricks/service/DBUtils$
at Main$.<init>(Main.scala:30)
at Main$.<clinit>(Main.scala)
Строка 30 моего класса Main.scala:
val dbutils: DBUtils.type = com.databricks.service.DBUtils
Точно так же, как это описано на этой странице документации
На этих страницах показан способ доступа к DBUtils, который работает как локально, так и в кластере. Но в примере показан только Python, а я использую Scala.
Как правильно получить к нему доступ так, чтобы он работал как локально, используя databricks-connect, так и в задании Databricks, выполняющем JAR?
ОБНОВИТЬ
Кажется, есть два способа использования DBUtils.
1) Описанный здесь класс DbUtils. Цитируя документацию, эта библиотека позволяет создавать и компилировать проект, но не запускать его. Это не позволяет запускать локальный код в кластере.
2) Databricks Connect, описанный здесь. Это позволяет запускать локальный код Spark в кластере Databricks.
Проблема в том, что эти два метода имеют разные настройки и имя пакета. Кажется, нет способа использовать Databricks Connect локально (который недоступен в кластере), но затем добавьте приложение jar с использованием класса DbUtils через sbt/maven, чтобы у кластера был доступ к нему.
2 ответа
Я не знаю, почему упомянутые вами документы не работают. Может быть, вы используете другую зависимость?
В этих документах есть пример приложения, которое вы можете скачать. Это проект с очень минимальным тестом, поэтому он не создает задания и не пытается запустить их в кластере, но это только начало. Также обратите внимание, что он использует более старые0.0.1
версия dbutils-api
.
Итак, чтобы исправить вашу текущую проблему, вместо использования com.databricks.service.DBUtils
попробуйте импортировать dbutils
из другого места:
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
Или, если хотите:
import com.databricks.dbutils_v1.{DBUtilsV1, DBUtilsHolder}
type DBUtils = DBUtilsV1
val dbutils: DBUtils = DBUtilsHolder.dbutils
Также убедитесь, что у вас есть следующая зависимость в SBT (возможно, попробуйте поиграть с версиями, если 0.0.3
не работает - последняя 0.0.4
):
libraryDependencies += "com.databricks" % "dbutils-api_2.11" % "0.0.3"
Этот вопрос и ответ указали мне в правильном направлении. Ответ содержит ссылку на рабочее репозиторий Github, в котором используетсяdbutils
: waimak. Я надеюсь, что это репо поможет вам ответить на дополнительные вопросы о конфигурации и зависимостях Databricks.
Удачи!
ОБНОВИТЬ
Я понимаю, у нас есть два похожих, но не идентичных API, и нет хорошего способа переключаться между локальной и серверной версией (хотя Databricks Connect обещает, что он все равно должен работать). Пожалуйста, позвольте мне предложить обходной путь.
Хорошо, что Scala удобна для написания адаптеров. Вот фрагмент кода, который должен работать как мост - вотDBUtils
определенный здесь объект, который обеспечивает достаточную абстракцию API для двух версий API: Databricks Connect, один на com.databricks.service.DBUtils
, и серверная часть com.databricks.dbutils_v1.DBUtilsHolder.dbutils
API. Мы можем добиться этого, загрузив и впоследствии используяcom.databricks.service.DBUtils
через отражение - у нас нет его жестко запрограммированного импорта.
package com.example.my.proxy.adapter
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams
import scala.util.Try
import scala.language.implicitConversions
import scala.language.reflectiveCalls
trait DBUtilsApi {
type FSUtils
type FileInfo
type SecretUtils
type SecretMetadata
type SecretScope
val fs: FSUtils
val secrets: SecretUtils
}
trait DBUtils extends DBUtilsApi {
trait FSUtils {
def dbfs: org.apache.hadoop.fs.FileSystem
def ls(dir: String): Seq[FileInfo]
def rm(dir: String, recurse: Boolean = false): Boolean
def mkdirs(dir: String): Boolean
def cp(from: String, to: String, recurse: Boolean = false): Boolean
def mv(from: String, to: String, recurse: Boolean = false): Boolean
def head(file: String, maxBytes: Int = 65536): String
def put(file: String, contents: String, overwrite: Boolean = false): Boolean
}
case class FileInfo(path: String, name: String, size: Long)
trait SecretUtils {
def get(scope: String, key: String): String
def getBytes(scope: String, key: String): Array[Byte]
def list(scope: String): Seq[SecretMetadata]
def listScopes(): Seq[SecretScope]
}
case class SecretMetadata(key: String) extends DefinedByConstructorParams
case class SecretScope(name: String) extends DefinedByConstructorParams
}
object DBUtils extends DBUtils {
import Adapters._
override lazy val (fs, secrets): (FSUtils, SecretUtils) = Try[(FSUtils, SecretUtils)](
(ReflectiveDBUtils.fs, ReflectiveDBUtils.secrets) // try to use the Databricks Connect API
).getOrElse(
(BackendDBUtils.fs, BackendDBUtils.secrets) // if it's not available, use com.databricks.dbutils_v1.DBUtilsHolder
)
private object Adapters {
// The apparent code copying here is for performance -- the ones for `ReflectiveDBUtils` use reflection, while
// the `BackendDBUtils` call the functions directly.
implicit class FSUtilsFromBackend(underlying: BackendDBUtils.FSUtils) extends FSUtils {
override def dbfs: FileSystem = underlying.dbfs
override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
}
implicit class FSUtilsFromReflective(underlying: ReflectiveDBUtils.FSUtils) extends FSUtils {
override def dbfs: FileSystem = underlying.dbfs
override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
}
implicit class SecretUtilsFromBackend(underlying: BackendDBUtils.SecretUtils) extends SecretUtils {
override def get(scope: String, key: String): String = underlying.get(scope, key)
override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
}
implicit class SecretUtilsFromReflective(underlying: ReflectiveDBUtils.SecretUtils) extends SecretUtils {
override def get(scope: String, key: String): String = underlying.get(scope, key)
override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
}
}
}
object BackendDBUtils extends DBUtilsApi {
import com.databricks.dbutils_v1
private lazy val dbutils: DBUtils = dbutils_v1.DBUtilsHolder.dbutils
override lazy val fs: FSUtils = dbutils.fs
override lazy val secrets: SecretUtils = dbutils.secrets
type DBUtils = dbutils_v1.DBUtilsV1
type FSUtils = dbutils_v1.DbfsUtils
type FileInfo = com.databricks.backend.daemon.dbutils.FileInfo
type SecretUtils = dbutils_v1.SecretUtils
type SecretMetadata = dbutils_v1.SecretMetadata
type SecretScope = dbutils_v1.SecretScope
}
object ReflectiveDBUtils extends DBUtilsApi {
// This throws a ClassNotFoundException when the Databricks Connection API isn't available -- it's much better than
// the NoClassDefFoundError, which we would get if we had a hard-coded import of com.databricks.service.DBUtils .
// As we're just using reflection, we're able to recover if it's not found.
private lazy val dbutils: DBUtils =
Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]
override lazy val fs: FSUtils = dbutils.fs
override lazy val secrets: SecretUtils = dbutils.secrets
type DBUtils = AnyRef {
val fs: FSUtils
val secrets: SecretUtils
}
type FSUtils = AnyRef {
def dbfs: org.apache.hadoop.fs.FileSystem
def ls(dir: String): Seq[FileInfo]
def rm(dir: String, recurse: Boolean): Boolean
def mkdirs(dir: String): Boolean
def cp(from: String, to: String, recurse: Boolean): Boolean
def mv(from: String, to: String, recurse: Boolean): Boolean
def head(file: String, maxBytes: Int): String
def put(file: String, contents: String, overwrite: Boolean): Boolean
}
type FileInfo = AnyRef {
val path: String
val name: String
val size: Long
}
type SecretUtils = AnyRef {
def get(scope: String, key: String): String
def getBytes(scope: String, key: String): Array[Byte]
def list(scope: String): Seq[SecretMetadata]
def listScopes(): Seq[SecretScope]
}
type SecretMetadata = DefinedByConstructorParams { val key: String }
type SecretScope = DefinedByConstructorParams { val name: String }
}
Если вы замените val dbutils: DBUtils.type = com.databricks.service.DBUtils
который вы упомянули в своем Main
с участием val dbutils: DBUtils.type = com.example.my.proxy.adapter.DBUtils
, все должно работать как оперативная замена, как локально, так и удаленно.
Если у вас есть новые NoClassDefFoundError
s, попробуйте добавить определенные зависимости в задание JAR или, возможно, попробуйте переставить их, изменить версии или пометить зависимости как предоставленные.
Этот адаптер некрасивый и использует отражение, но я надеюсь, что он должен быть достаточно хорошим решением. Удачи:)
Для доступа к dbutils.fs и dbutils.secrets Databricks Utilities вы используете модуль DBUtils.
Пример: доступ к DBUtils в программировании на Scala выглядит так:
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
Ссылка: Databricks - доступ к DBUtils.
Надеюсь это поможет.