Как правильно получить доступ к dbutils в Scala при использовании Databricks Connect

Я использую Databricks Connect для локального запуска кода в моем кластере Azure Databricks из IntelliJ IDEA (Scala).

Все нормально работает. Я могу подключаться, отлаживать, проверять локально в среде IDE.

Я создал задание Databricks для запуска своего пользовательского JAR-файла приложения, но оно не выполняется из-за следующего исключения:

19/08/17 19:20:26 ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: com/databricks/service/DBUtils$
at Main$.<init>(Main.scala:30)
at Main$.<clinit>(Main.scala)

Строка 30 моего класса Main.scala:

val dbutils: DBUtils.type = com.databricks.service.DBUtils

Точно так же, как это описано на этой странице документации

На этих страницах показан способ доступа к DBUtils, который работает как локально, так и в кластере. Но в примере показан только Python, а я использую Scala.

Как правильно получить к нему доступ так, чтобы он работал как локально, используя databricks-connect, так и в задании Databricks, выполняющем JAR?

ОБНОВИТЬ

Кажется, есть два способа использования DBUtils.

1) Описанный здесь класс DbUtils. Цитируя документацию, эта библиотека позволяет создавать и компилировать проект, но не запускать его. Это не позволяет запускать локальный код в кластере.

2) Databricks Connect, описанный здесь. Это позволяет запускать локальный код Spark в кластере Databricks.

Проблема в том, что эти два метода имеют разные настройки и имя пакета. Кажется, нет способа использовать Databricks Connect локально (который недоступен в кластере), но затем добавьте приложение jar с использованием класса DbUtils через sbt/maven, чтобы у кластера был доступ к нему.

2 ответа

Я не знаю, почему упомянутые вами документы не работают. Может быть, вы используете другую зависимость?

В этих документах есть пример приложения, которое вы можете скачать. Это проект с очень минимальным тестом, поэтому он не создает задания и не пытается запустить их в кластере, но это только начало. Также обратите внимание, что он использует более старые0.0.1 версия dbutils-api.

Итак, чтобы исправить вашу текущую проблему, вместо использования com.databricks.service.DBUtilsпопробуйте импортировать dbutils из другого места:

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

Или, если хотите:

import com.databricks.dbutils_v1.{DBUtilsV1, DBUtilsHolder}

type DBUtils = DBUtilsV1
val dbutils: DBUtils = DBUtilsHolder.dbutils

Также убедитесь, что у вас есть следующая зависимость в SBT (возможно, попробуйте поиграть с версиями, если 0.0.3 не работает - последняя 0.0.4):

libraryDependencies += "com.databricks" % "dbutils-api_2.11" % "0.0.3"

Этот вопрос и ответ указали мне в правильном направлении. Ответ содержит ссылку на рабочее репозиторий Github, в котором используетсяdbutils: waimak. Я надеюсь, что это репо поможет вам ответить на дополнительные вопросы о конфигурации и зависимостях Databricks.

Удачи!


ОБНОВИТЬ

Я понимаю, у нас есть два похожих, но не идентичных API, и нет хорошего способа переключаться между локальной и серверной версией (хотя Databricks Connect обещает, что он все равно должен работать). Пожалуйста, позвольте мне предложить обходной путь.

Хорошо, что Scala удобна для написания адаптеров. Вот фрагмент кода, который должен работать как мост - вотDBUtils определенный здесь объект, который обеспечивает достаточную абстракцию API для двух версий API: Databricks Connect, один на com.databricks.service.DBUtils, и серверная часть com.databricks.dbutils_v1.DBUtilsHolder.dbutilsAPI. Мы можем добиться этого, загрузив и впоследствии используяcom.databricks.service.DBUtils через отражение - у нас нет его жестко запрограммированного импорта.

package com.example.my.proxy.adapter

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams

import scala.util.Try

import scala.language.implicitConversions
import scala.language.reflectiveCalls


trait DBUtilsApi {
    type FSUtils
    type FileInfo

    type SecretUtils
    type SecretMetadata
    type SecretScope

    val fs: FSUtils
    val secrets: SecretUtils
}

trait DBUtils extends DBUtilsApi {
    trait FSUtils {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean = false): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean = false): Boolean
        def mv(from: String, to: String, recurse: Boolean = false): Boolean
        def head(file: String, maxBytes: Int = 65536): String
        def put(file: String, contents: String, overwrite: Boolean = false): Boolean
    }

    case class FileInfo(path: String, name: String, size: Long)

    trait SecretUtils {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    case class SecretMetadata(key: String) extends DefinedByConstructorParams
    case class SecretScope(name: String) extends DefinedByConstructorParams
}

object DBUtils extends DBUtils {

    import Adapters._

    override lazy val (fs, secrets): (FSUtils, SecretUtils) = Try[(FSUtils, SecretUtils)](
        (ReflectiveDBUtils.fs, ReflectiveDBUtils.secrets)    // try to use the Databricks Connect API
    ).getOrElse(
        (BackendDBUtils.fs, BackendDBUtils.secrets)    // if it's not available, use com.databricks.dbutils_v1.DBUtilsHolder
    )

    private object Adapters {
        // The apparent code copying here is for performance -- the ones for `ReflectiveDBUtils` use reflection, while
        // the `BackendDBUtils` call the functions directly.

        implicit class FSUtilsFromBackend(underlying: BackendDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class FSUtilsFromReflective(underlying: ReflectiveDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class SecretUtilsFromBackend(underlying: BackendDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }

        implicit class SecretUtilsFromReflective(underlying: ReflectiveDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }
    }
}

object BackendDBUtils extends DBUtilsApi {
    import com.databricks.dbutils_v1

    private lazy val dbutils: DBUtils = dbutils_v1.DBUtilsHolder.dbutils
    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = dbutils_v1.DBUtilsV1
    type FSUtils = dbutils_v1.DbfsUtils
    type FileInfo = com.databricks.backend.daemon.dbutils.FileInfo

    type SecretUtils = dbutils_v1.SecretUtils
    type SecretMetadata = dbutils_v1.SecretMetadata
    type SecretScope = dbutils_v1.SecretScope
}

object ReflectiveDBUtils extends DBUtilsApi {
    // This throws a ClassNotFoundException when the Databricks Connection API isn't available -- it's much better than
    // the NoClassDefFoundError, which we would get if we had a hard-coded import of com.databricks.service.DBUtils .
    // As we're just using reflection, we're able to recover if it's not found.
    private lazy val dbutils: DBUtils =
        Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]

    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = AnyRef {
        val fs: FSUtils
        val secrets: SecretUtils
    }

    type FSUtils = AnyRef {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean): Boolean
        def mv(from: String, to: String, recurse: Boolean): Boolean
        def head(file: String, maxBytes: Int): String
        def put(file: String, contents: String, overwrite: Boolean): Boolean
    }

    type FileInfo = AnyRef {
        val path: String
        val name: String
        val size: Long
    }

    type SecretUtils = AnyRef {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    type SecretMetadata = DefinedByConstructorParams { val key: String }

    type SecretScope = DefinedByConstructorParams { val name: String }
}

Если вы замените val dbutils: DBUtils.type = com.databricks.service.DBUtils который вы упомянули в своем Main с участием val dbutils: DBUtils.type = com.example.my.proxy.adapter.DBUtils, все должно работать как оперативная замена, как локально, так и удаленно.

Если у вас есть новые NoClassDefFoundErrors, попробуйте добавить определенные зависимости в задание JAR или, возможно, попробуйте переставить их, изменить версии или пометить зависимости как предоставленные.

Этот адаптер некрасивый и использует отражение, но я надеюсь, что он должен быть достаточно хорошим решением. Удачи:)

Для доступа к dbutils.fs и dbutils.secrets Databricks Utilities вы используете модуль DBUtils.

Пример: доступ к DBUtils в программировании на Scala выглядит так:

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

Ссылка: Databricks - доступ к DBUtils.

Надеюсь это поможет.

Другие вопросы по тегам