Планирование загрузки из BigQuery в MongoDB с использованием apache-airflow ( cloud-composer)

Я пытаюсь настроить конвейеры данных, которые перемещают данные из GCS в BigQuery, выполняют определенные задачи / обработку и загружают их в кластер MongoDB (все настроены в python с использованием DAG). Я был в состоянии достичь этого вплоть до загрузки в MongoDB. Существуют ли операторы воздушного потока, которые могут это сделать? Если нет, то возможно ли создать собственный код, используя хуки mongoDB, предоставляемые в потоке воздуха?

Спасибо GT

2 ответа

Он считается антипаттерном для передачи больших объемов данных в XCOM. Я бы порекомендовал записывать данные из BigQuery в долговременную службу хранения, такую ​​как Cloud Storage, а затем загружать их в MongoDB.

Самый простой / быстрый способ - использовать PythonOperator и получить доступ к необходимому объекту крючка напрямую.

Если вам нужно делать это часто, я бы рекомендовал упаковывать код как пользовательский оператор.

Я наконец заставил его работать из не очень элегантного кода. Я в основном использую BigQueryGetDataOperator здесь, чтобы получить данные из таблицы BQ и MongoHook здесь, чтобы вставить его в MongoDB. Мне все еще нужно проверить это для больших объемов данных и производительности. Я думал, что поделюсь с кем-то таким же новичком, как и я, с Mongo и Airflow.

from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
  import logging as log
  #Import pymongo
  from pymongo import MongoClient
  #Pull the data saved in XCom
  value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

  header = ['V1','V2']
  data=[]
  for rows in value:
    onerow={}
    for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
    data.append(onerow)
  log.info("Pulled...")
  log.info(data)
  log.info("Pushing into mongodb...")
  client = MongoClient(localhost:27017)
  db = client.test
  collection = db.testingbq2mongo
  collection.insert(data)
  log.info("Written to mongoDB...")
  client.close()

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date':yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    #'retry_delay': datetime.timedelta(minutes=5),
    'project_id': 'data-rubrics'
}

try:
  # [START composer_quickstart_schedule]
  with models.DAG(
        'composer_testing00',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]
  data_list = bigquery_get_data.BigQueryGetDataOperator(\
task_id='get_data_in_list_from_bq',\
dataset_id='testcomposer',\ # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)

  op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
Другие вопросы по тегам