Google Cloud Composer и Google Cloud SQL
Какие у нас есть способы подключения к экземпляру Google Cloud SQL (MySQL) из недавно представленного Google Cloud Composer? Намерение состоит в том, чтобы получить данные из экземпляра Cloud SQL в BigQuery (возможно, с промежуточным шагом через Cloud Storage).
Может ли облачный SQL-прокси каким-либо образом быть представлен на модулях, входящих в кластер Kubernetes, на котором размещается Composer?
Если нет, можно ли подключить Cloud SQL Proxy с помощью сервисного брокера Kubernetes? -> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker
Следует ли использовать Airflow для планирования и вызова команд GCP API, таких как 1) экспорт таблицы mysql в облачное хранилище 2) чтение экспорта mysql в bigquery?
Возможно, есть другие методы, которые мне не хватает, чтобы сделать это
2 ответа
"Облачный прокси-сервер SQL обеспечивает безопасный доступ к экземплярам Cloud SQL второго поколения без необходимости внесения в белый список IP-адресов или настройки SSL". - Документы Google CloudSQL-Proxy
Cloud SQL Proxy, кажется, является рекомендуемым способом подключения к CloudSQL выше всех остальных. Таким образом, в Composer, начиная с версии 1.6.1, мы можем создать новый модуль Kubernetes Pod для запуска образа gcr.io/cloudsql-docker/gce-proxy:latest, предоставить его через службу, а затем создать соединение в Composer для использования в операторе.
Чтобы настроить:
Следуйте документации Google
Проверьте соединение, используя информацию из Medium Post Арика
Проверьте, что стручок был создан
kubectl get pods --all-namespaces
Убедитесь, что сервис создан
kubectl get services --all-namespaces
Перейти в рабочий узел
kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash
- Проверьте соединение MySQL
mysql -u composer -p --host <service-name>.default.svc.cluster.local
- Проверьте соединение MySQL
Заметки:
Composer теперь использует пространства имен для организации модулей
Стручки в разных пространствах имен не общаются друг с другом, пока вы не дадите им полный путь
<k8-service-name>.<k8-namespace-name>.svc.cluster.local
Создание нового соединения Composer с полным путем позволит успешное соединение
У нас была такая же проблема, но с экземпляром Postgres. Вот что мы сделали и заставили его работать:
создайте развертывание sqlproxy в кластере Kubernetes, где работает поток воздуха. Это была копия существующего airflow-sqlproxy, используемого соединением airflow_db по умолчанию со следующими изменениями в файле развертывания:
- замените все экземпляры airflow-sqlproxy новым именем прокси
- отредактируйте под 'spec: template: spec: container: command: -instances', замените существующее имя экземпляра новым экземпляром, к которому мы хотим подключиться
создайте сервис kubernetes, снова как копию существующего сервиса airflow-sqlproxy, со следующими изменениями:
- замените все экземпляры airflow-sqlproxy новым именем прокси
- в разделе "spec: ports" измените на соответствующий порт (мы использовали 5432 для экземпляра Postgres)
в пользовательском интерфейсе воздушного потока добавьте соединение типа Postgres с установленным хостом к вновь созданному имени службы.
Вы можете следовать этим инструкциям, чтобы запустить новый экземпляр прокси-сервера Cloud SQL в кластере.
Re #3: Это звучит как хороший план. Насколько мне известно, нет оператора Cloud SQL to BigQuery, поэтому вам придется сделать это в два этапа, как вы описали.
Добавление среднего сообщения в комментариях от @Leo на верхний уровень https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53. После того, как вы выполните эту статью и настроите службу, вы можете подключиться из своей DAG с помощью SQLAlchemy следующим образом:
import os
from datetime import datetime, timedelta
import logging
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"
default_args = {
'start_date': datetime(2019, 7, 16)
}
def connect_to_cloud_sql():
'''
Create a connection to CloudSQL
:return:
'''
import sqlalchemy
try:
PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
logger.info("DB URL", PROXY_DB_URL)
engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
for result in engine.execute("SELECT NOW() as now"):
logger.info(dict(result))
except Exception:
logger.exception("Unable to interact with CloudSQL")
dag = DAG(
dag_id="example_sqlalchemy",
default_args=default_args,
# schedule_interval=timedelta(minutes=5),
catchup=False # If you don't set this then the dag will run according to start date
)
t1 = PythonOperator(
task_id="example_sqlalchemy",
python_callable=connect_to_cloud_sql,
dag=dag
)
if __name__ == "__main__":
connect_to_cloud_sql()
Теперь мы можем подключиться к Cloud SQL, не создавая облачный прокси самостоятельно. Оператор создаст его автоматически. Код выглядит так:
from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
export_body = {
'exportContext': {
'fileType': 'CSV',
'uri': EXPORT_URI,
'databases': [DB_NAME],
'csvExportOptions': {
'selectQuery': SQL
}
}
}
default_dag_args = {}
with DAG(
'postgres_test',
schedule_interval='@once',
default_args=default_dag_args) as dag:
sql_export_task = CloudSqlInstanceExportOperator(
project_id=GCP_PROJECT_ID,
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task'
)
Здесь, в ответе Хоффа на аналогичный вопрос, вы можете найти ссылку на то, как Wepay синхронизирует его каждые 15 минут с помощью оператора воздушного потока.
Из указанного ответа:
Посмотрите, как это делает WePay:
Оператор MySQL to GCS выполняет запрос SELECT к таблице MySQL. SELECT извлекает все данные, превышающие (или равные) последний верхний водяной знак. Верхний водяной знак является либо первичным ключом таблицы (если таблица только для добавления), либо столбцом метки времени изменения (если таблица получает обновления). Опять же, оператор SELECT также возвращается на некоторое время назад (или строки), чтобы перехватить потенциально пропущенные строки из последнего запроса (из-за проблем, упомянутых выше).
С Airflow им удается синхронизировать BigQuery с базой данных MySQL каждые 15 минут.