Необходимо переписать код Scala для определенного вывода JSON

Я пытаюсь зарегистрировать родословную записной книжки Databricks в Azure Purview через spline и apacheatlas api. Есть две версии кода: 1) это исходный код, который использует среду выполнения databricks версии 6.4 и работает так, как ожидалось, но нам нужно запустить его в более новой версии среды выполнения, по крайней мере, версии 7.5 и выше, поэтому есть 2) секунда. версия кода, переработанная для выполнения версии 7.5. В частности, новому коду требуются новые пакеты JSON, но результат (см. Ниже) не соответствует ожидаемому результату исходного кода (также показан ниже). Необходимо переписать код для его правильного выполнения, потому что в текущем новом коде есть ошибки. Спасибо

ОРИГИНАЛЬНЫЙ СТАРЫЙ КОД: исходный код, использующий Databricks Runtime версии 6.4, приведен ниже.

      %scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON

val splineConf: Configuration = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(splineConf) {
  //override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
  //val test = dbutils.notebook.getContext.notebookPath
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
  val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

  val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
  val notebookPath = extraContextMap("notebook_path").split("/")
  
  val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
  val user = tagMap("user")
  val name = notebookPath(notebookPath.size-1)
  
  val notebookInfo = Map("notebookURL" -> notebookURL,  
                "user" -> user, 
                "name" -> name,
                "mounts" -> dbutils.fs.ls("/mnt").map(_.path),
                "timestamp" -> System.currentTimeMillis)
  val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
  
  override protected def userExtraMetadataProvider: UserExtraMetadataProvider = new UserExtraMetadataProvider {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  }
})

ОРИГИНАЛЬНЫЙ ОЖИДАЕМЫЙ ВЫХОД: вот ожидаемый вывод JSON из исходного кода.

      {
  'id': '618c92e1-ae79-491e-b6fd-b5080dc7ef8d',
  'operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/parquet/Customers_new',
      'append': False,
      'id': 0,
      'childIds': [
        1
      ],
      'params': {
        'path': 'dbfs:/mnt/test_data/parquet/Customers_new'
      },
      'extra': {
        'name': 'InsertIntoHadoopFsRelationCommand',
        'destinationType': 'Parquet',
        'foo': 'bar4'
      }
    },
    'reads': [
      {
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],
        'id': 1,
        'schema': [
          'b6112e12-9b90-46db-b919-bcc9c6280759',
          '9f0671fe-813e-4608-a870-adae8386c46e',
          '8dfcf4df-211c-48b0-8dec-1b1486dd0db4'
        ],
        'params': {
          'delimiter': ',',
          'inferschema': 'true',
          'header': 'true',
          'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
        },
        'extra': {
          'name': 'LogicalRelation',
          'sourceType': 'CSV',
          'foo': 'bar3'
        }
      }
    ]
  },
  'systemInfo': {
    'name': 'spark',
    'version': '2.4.5'
  },
  'agentInfo': {
    'name': 'spline',
    'version': '0.5.3'
  },
  'extraInfo': {
    'appName': 'Databricks Shell',
    'dataTypes': [
      {
        '_typeHint': 'dt.Simple',
        'id': 'df02093b-d529-4c8d-b422-9ac468baa765',
        'name': 'integer',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': '88f773f8-982c-4d6c-bed3-1600a99c5943',
        'name': 'string',
        'nullable': True
      }
    ],
    'attributes': [
      {
        'id': 'b6112e12-9b90-46db-b919-bcc9c6280759',
        'name': 'CustomerID',
        'dataTypeId': 'df02093b-d529-4c8d-b422-9ac468baa765'
      },
      {
        'id': '9f0671fe-813e-4608-a870-adae8386c46e',
        'name': 'FirstName',
        'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      },
      {
        'id': '8dfcf4df-211c-48b0-8dec-1b1486dd0db4',
        'name': 'LastName',
        'dataTypeId': '88f773f8-982c-4d6c-bed3-1600a99c5943'
      }
    ],
    'notebookInfo': {
      'obj': {
        'name': 'initialize_spline_original',
        'timestamp': 1623430575561,
        'notebookURL': 'adb-2323242424.azuredatabricks.net/',
        'mounts': [
          'dbfs:/mnt/datalake/',
          'dbfs:/mnt/landing_dde/',
          'dbfs:/mnt/landing_newc/',
          'dbfs:/mnt/landing_sourcedb/',
          'dbfs:/mnt/testmount/',
          'dbfs:/mnt/training/'
        ],
        'user': 'user@test.com'
      }
    }
  }
}

НОВЫЙ КОД: это новый код, в котором используется среда выполнения Databricks версии 7.5, а также обновленные и переработанные пакеты JSON.

          %scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model._
import play.api.libs.json._
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper


val splineConf: Configuration = StandardSplineConfigurationStack(spark)


spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val mapper = new ObjectMapper() with ScalaObjectMapper 
  mapper.registerModule(DefaultScalaModule)
val outerMap = mapper.readValue[Map[String, Object]](notebookInformationJson)
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)

  val notebookInfo = Map("notebookURL" -> (notebookURL),
    "user" -> (user),
    "name" -> (name),
    "mounts" -> (dbutils.fs.ls("/mnt").map(_.path)),
    "timestamp" -> (System.currentTimeMillis))

 val mapper1 = new ObjectMapper()
 val notebookInfoJson = mapper1.writeValueAsString(notebookInfo)

  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  })
})

НОВЫЙ ВЫВОД: выходные данные объекта JSON для нового кода выглядят следующим образом:

      {
  'id': '12812131038144',
  'operations': {
    'write': {
      'outputSource': 'dbfs:/mnt/test_data/delta/customers_join_orders_new',
      'append': False,
      'id': 0,
      'childIds': [
        1
      ],
      'params': {
        'path': 'dbfs:/mnt/test_data/delta/customers_join_orders_new'
      },
      'extra': {
        'destinationType': 'tahoe',
        'foo': 'bar4',
        'name': 'SaveIntoDataSourceCommand'
      }
    },
    'reads': [
      {
        'childIds': [
          
        ],
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Orders.csv'
        ],
        'id': 4,
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023'
        ],
        'params': {
          'inferschema': 'true',
          'header': 'true',
          'delimiter': ',',
          'path': 'dbfs:/mnt/test_data/csv/Orders.csv'
        },
        'extra': {
          'sourceType': 'csv',
          'foo': 'bar3',
          'name': 'LogicalRelation'
        }
      },
      {
        'childIds': [
          
        ],
        'inputSources': [
          'dbfs:/mnt/test_data/csv/Customers.csv'
        ],
        'id': 6,
        'schema': [
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'inferschema': 'true',
          'header': 'true',
          'delimiter': ',',
          'path': 'dbfs:/mnt/test_data/csv/Customers.csv'
        },
        'extra': {
          'sourceType': 'csv',
          'foo': 'bar3',
          'name': 'LogicalRelation'
        }
      }
    ],
    'other': [
      {
        'id': 3,
        'childIds': [
          4
        ],
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023'
        ],
        'params': {
          'identifier': 'orders'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'SubqueryAlias'
        }
      },
      {
        'id': 5,
        'childIds': [
          6
        ],
        'schema': [
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'identifier': 'customers'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'SubqueryAlias'
        }
      },
      {
        'id': 2,
        'childIds': [
          3,
          5
        ],
        'schema': [
          '2018',
          '2019',
          '2020',
          '2021',
          '2022',
          '2023',
          '2065',
          '2066',
          '2067'
        ],
        'params': {
          'condition': {
            '_typeHint': 'expr.Binary',
            'symbol': '=',
            'dataTypeId': '05ffb715-9781-4019-aee3-21c77f80d2a1',
            'children': [
              {
                '_typeHint': 'expr.AttrRef',
                'refId': '2019'
              },
              {
                '_typeHint': 'expr.AttrRef',
                'refId': '2065'
              }
            ]
          },
          'hint': '',
          'joinType': 'INNER'
        },
        'extra': {
          'foo': 'bar5',
          'name': 'Join'
        }
      },
      {
        'id': 1,
        'childIds': [
          2
        ],
        'schema': [
          '2065',
          '2020',
          '2022',
          '2023'
        ],
        'params': {
          'projectList': [
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2065'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2020'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2022'
            },
            {
              '_typeHint': 'expr.AttrRef',
              'refId': '2023'
            }
          ]
        },
        'extra': {
          'foo': 'bar5',
          'name': 'Project'
        }
      }
    ]
  },
  'systemInfo': {
    'name': 'spark',
    'version': '3.0.1'
  },
  'agentInfo': {
    'name': 'spline',
    'version': '0.6.0'
  },
  'extraInfo': {
    'appName': 'Databricks Shell',
    'dataTypes': [
      {
        '_typeHint': 'dt.Simple',
        'id': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca',
        'name': 'integer',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be',
        'name': 'double',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4',
        'name': 'string',
        'nullable': True
      },
      {
        '_typeHint': 'dt.Simple',
        'id': '05ffb715-9781-4019-aee3-21c77f80d2a1',
        'name': 'boolean',
        'nullable': True
      }
    ],
    'notebookInfo': '{"traversableAgain":true,"empty":false}',
    'attributes': [
      {
        'id': '2018',
        'name': 'SalesOrderID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2019',
        'name': 'CustomerID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2020',
        'name': 'OrderQty',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2021',
        'name': 'ProductID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2022',
        'name': 'UnitPrice',
        'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
      },
      {
        'id': '2023',
        'name': 'LineTotal',
        'dataTypeId': 'a982225d-4ad5-49f7-a0d9-dfb90c0ab2be'
      },
      {
        'id': '2065',
        'name': 'CustomerID',
        'dataTypeId': 'bffcb1eb-841b-482f-bdad-f8ebf9db66ca'
      },
      {
        'id': '2066',
        'name': 'FirstName',
        'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      },
      {
        'id': '2067',
        'name': 'LastName',
        'dataTypeId': 'f102fb63-29bb-475c-b98d-85f9a8ddb2d4'
      }
    ]
  }
}

1 ответ

Нет необходимости в явной сериализации/десериализации JSON. Все, что вам нужно сделать, это вытащить необходимые значения из блокнота Contextиспользуя соответствующие методы получения, упакуйте их в любой сериализуемый объект (например, Map) и поместите его непосредственно в дополнительные функции плана выполнения сплайна.

      import org.apache.commons.configuration._

import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.v1_1._

import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._

val notebookInfo = {
  val ctx = dbutils.notebook.getContext
  val tags = ctx.tags
  val notebookName = ctx.notebookPath.get.split("/").last
  val notebookURL = s"${tags("browserHostName")}/?o=${tags("orgId")}${tags("browserHash")}"
  val user = tags("user")
  val mounts = dbutils.fs.ls("/mnt").map(_.path)
  
  Map(
    "notebookURL" -> notebookURL,
    "user" -> user,
    "name" -> notebookName,
    "mounts" -> mounts,
    "timestamp" -> System.currentTimeMillis
  )
}

val splineConf = StandardSplineConfigurationStack(spark)

spark.enableLineageTracking(new DefaultSplineConfigurer(spark, splineConf) {
  override def postProcessingFilter = new AbstractPostProcessingFilter {
    override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext): ExecutionPlan = {
      plan.withAddedExtra(Map(
        "notebookInfo" -> notebookInfo
      ))
    }
  }
})

Примечание : я не использую UserExtraMetadataProvider(он устарел и будет удален из предстоящего основного выпуска). Вместо этого я использую PostProcessingFilterчто по сути является почти такой же абстракцией, но с расширенными обязанностями. (Подробнее см. — https://github.com/AbsaOSS/spline-spark-agent#filters)

Другие вопросы по тегам