Airflow: Как передать оператору переменную в формате json/dict?

Мне нужно знать, как передать зарегистрированный словарь в качестве переменной в параметрах оператора, например, для запуска блокнота из блоков данных.

В моем случае я пробовал некоторые вещи, но не работает.

У меня есть эта переменная, сохраненная в Airflow:

      "dictionaries" : {
                     "dict1" : {
                                "a" : 1,
                                "b" : 2
                               }

и этот код, чтобы попытаться получить переменную

      dict = "dict1"
values = f"{{{{ var.json.dictionaries.{dict} }}}}"

чтобы позже передать его как словарь вместе с другими значениями в параметрах оператора

      task1 = DatabricksRunNowOperator(
            task_id=f'Databricks_{dict1}',
            databricks_conn_id='databricks',
            job_id= 1111,
            notebook_params={"param1": "param1" , **values}

Это не удается, потому что переменная имеет вид str "TypeError: объект str не является сопоставлением", поэтому я попытался использовать библиотеку json, чтобы попытаться преобразовать формат, но мне это не удалось, я получаю сообщение об ошибке " Ожидается имя свойства, заключенное в двойные кавычки"

Поэтому я подозреваю, что это может быть извлечение переменной с одинарными кавычками, я также пытался использовать замену, чтобы попытаться изменить их на двойные, но это тоже не сработало.

      json.loads(values.replace("'","\""))

Использование jinja для извлечения этой переменной может вести себя не так, я немного запутался с этим, я пробовал в последний раз

      values = json.loads(f"{{{{ (var.json.dictionaries.{dict1}).replace('\'','\"') }}}}")
values = json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace('\'','\"'))

также внутри оператора

      notebook_params={"param1": "param1", **json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace('\'','\"'))}

но получите ту же ошибку «json.decoder.JSONDecodeError: ожидается имя свойства, заключенное в двойные кавычки»

Конечно, я неправильно истолковываю способ восстановления переменной и ее преобразования, если кто-то может мне помочь с этим, я был бы признателен.

С уважением

1 ответ

Вам нужно будет сохранить значение переменной, как показано ниже:

      {
 "dictionaries" : {
                   "dict1" : {
                             "a" : 1,
                             "b" : 2
                            }
                   }
}

И когда вы обращаетесь к переменной в коде, передайте deserialize_json=True. Допустим, тогда ваша переменная называлась «json»,

      from airflow.models import Variable
json_data = Variable.get('json', deserialize_json=True)
dict1 = json_data['dictionaries']['dict1']
Другие вопросы по тегам