Как мы можем использовать SFTPToGCSOperator в среде композитора GCP (1.10.6)?
Здесь я хочу использовать SFTPToGCSOperator в среде композитора (1.10.6) GCP. Я знаю, что есть ограничение, потому что оператор присутствует только в последней версии воздушного потока, а не в последней версии композитора 1.10.6.
См. Ссылку -https://airflow.readthedocs.io/en/latest/howto/operator/gcp/sftp_to_gcs.html
Я нашел альтернативу оператору и создал класс плагина. Но снова я столкнулся с проблемой для класса sftphook. Теперь я использую старую версию класса sftphook.
см. ссылку ниже -
из airflow.contrib.hooks.sftp_hook импортировать SFTPHookhttps://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html
Я создал класс плагина, позже он импортируется в мой скрипт DAG. Он работает нормально только тогда, когда мы перемещаем один файл. В этом случае нам нужно передать полный путь к файлу с расширением.
Пожалуйста, обратитесь к примеру ниже (он отлично работает в этом сценарии)
DIR = "/test/sftp_dag_test/source_dir"
OBJECT_SRC_1 = "file.csv"
source_path=os.path.join(DIR, OBJECT_SRC_1),
Кроме этого. Если мы используем подстановочный знак, я имею в виду, что если мы хотим переместить все файлы из каталога, я получаю ошибку для метода get_tree_map.
Пожалуйста, смотрите ниже код DAG
import os
from airflow import models
from airflow.models import Variable
from PluginSFTPToGCSOperator import SFTPToGCSOperator
#from airflow.contrib.operators.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
DIR_path = "/main_dir/sub_dir/"
BUCKET_SRC = "test-gcp-bucket"
with models.DAG(
"dag_sftp_to_gcs", default_args=default_args, schedule_interval=None
) as dag:
copy_sftp_to_gcs = SFTPToGCSOperator(
task_id="t_sftp_to_gcs",
sftp_conn_id="test_sftp_conn",
gcp_conn_id="google_cloud_default",
source_path=os.path.join(DIR_path, "*.gz"),
destination_bucket=BUCKET_SRC,
)
copy_sftp_to_gcs
Здесь мы используем подстановочный знак * в скрипте DAG, см. Ниже класс плагина.
import os
from tempfile import NamedTemporaryFile
from typing import Optional, Union
from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class SFTPToGCSOperator(BaseOperator):
template_fields = ("source_path", "destination_path", "destination_bucket")
@apply_defaults
def __init__(
self,
source_path: str,
destination_bucket: str = "destination_bucket",
destination_path: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "sftp_conn_plugin",
delegate_to: Optional[str] = None,
mime_type: str = "application/octet-stream",
gzip: bool = False,
move_object: bool = False,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_path = source_path
self.destination_path = self._set_destination_path(destination_path)
print('destination_bucket : ',destination_bucket)
self.destination_bucket = destination_bucket
self.gcp_conn_id = gcp_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip
self.sftp_conn_id = sftp_conn_id
self.move_object = move_object
def execute(self, context):
print("inside execute")
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_path:
total_wildcards = self.source_path.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_path parameter. "
"Found {} in {}.".format(total_wildcards, self.source_path)
)
print('self.source_path : ',self.source_path)
prefix, delimiter = self.source_path.split(WILDCARD, 1)
print('prefix : ',prefix)
base_path = os.path.dirname(prefix)
print('base_path : ',base_path)
files, _, _ = sftp_hook.get_tree_map(
base_path, prefix=prefix, delimiter=delimiter
)
for file in files:
destination_path = file.replace(base_path, self.destination_path, 1)
self._copy_single_object(gcs_hook, sftp_hook, file, destination_path)
else:
destination_object = (
self.destination_path
if self.destination_path
else self.source_path.rsplit("/", 1)[1]
)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_path, destination_object
)
def _copy_single_object(
self,
gcs_hook: GoogleCloudStorageHook,
sftp_hook: SFTPHook,
source_path: str,
destination_object: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of %s to gs://%s/%s",
source_path,
self.destination_bucket,
destination_object,
)
with NamedTemporaryFile("w") as tmp:
sftp_hook.retrieve_file(source_path, tmp.name)
print('before upload self det object : ',self.destination_bucket)
gcs_hook.upload(
self.destination_bucket,
destination_object,
tmp.name,
self.mime_type,
)
if self.move_object:
self.log.info("Executing delete of %s", source_path)
sftp_hook.delete_file(source_path)
@staticmethod
def _set_destination_path(path: Union[str, None]) -> str:
if path is not None:
return path.lstrip("/") if path.startswith("/") else path
return ""
@staticmethod
def _set_bucket_name(name: str) -> str:
bucket = name if not name.startswith("gs://") else name[5:]
return bucket.strip("/")
class SFTPToGCSOperatorPlugin(AirflowPlugin):
name = "SFTPToGCSOperatorPlugin"
operators = [SFTPToGCSOperator]
Итак, этот класс подключаемого модуля я импортирую в свой сценарий DAG, и он отлично работает, когда мы используем имя файла, потому что код находится внутри условия else.
Но когда мы используем подстановочный знак, у нас есть курсор внутри условия if, и я получаю сообщение об ошибке для метода get_tree_map.
см. ошибку ниже -
ERROR - 'SFTPHook' object has no attribute 'get_tree_map'
Я нашел причину этой ошибки, этого метода нет в композиторе (воздушный поток 1.10.6) - https://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html
Этот метод присутствует в последней версии воздушного потока https://airflow.readthedocs.io/en/latest/_modules/airflow/providers/sftp/hooks/sftp.html
Теперь, что я могу попробовать, есть ли альтернатива этому методу или альтернатива этому классу операторов.
Кто-нибудь знает, есть ли для этого решение?
Заранее спасибо.
Не обращайте внимания на опечатку или ошибку отступа в stackru. В моем коде нет ошибки отступа.
2 ответа
Пакеты «поставщики» доступны только в Airflow 2.0, который еще недоступен в Cloud Composer (на момент написания этого поста последним доступным образом Airflow является 1.10.14, выпущенный сегодня утром).
НО вы можете импортировать пакеты обратного переноса, которые позволят вам пользоваться этими новыми пакетами в более ранних версиях 1.10.*.
Мои требования.txt:
apache-airflow-backport-providers-ssh==2020.10.29
apache-airflow-backport-providers-sftp==2020.10.29
pysftp>=0.2.9
paramiko>=2.6.0
sshtunnel<0.2,>=0.1.4
Вы можете импортировать пакеты PyPi непосредственно в среду Composer из консоли.
С этими зависимостями я мог бы использовать новейшие
airflow.providers.ssh.operators.ssh.SSHOperator
(ранее
airflow.contrib.operators.ssh_operator.SSHOperator
) и новый
airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSToSFTPOperator
(не имевший аналога в
contrib
операторы).
Наслаждаться!
Использовать SFTPToGCSOperator
в Google Cloud Composer на Airflow версии 1.10.6 нам нужно создать плагин и каким-то образом "взломать" Airflow, скопировав коды операторов / ловушек в один файл, чтобы включить SFTPToGCSOperator
используйте код из версии Airflow 1.10.10.
В последней версии Airflow есть новый airflow.providers
каталог, которого не было в более ранних версиях. Вот почему вы увидели следующую ошибку:No module named airflow.providers
. Все внесенные мной изменения описаны здесь:
Я подготовил рабочий плагин, который вы можете скачать здесь. Перед его использованием мы должны установить следующие библиотеки PyPI в среде Cloud Composer:pysftp
, paramiko
, sshtunnel
.
Я скопировал полностью
SFTPToGCSOperator
код, который начинается в 792 строке. Вы можете видеть, что этот оператор используетGCSHook
:from airflow.providers.google.cloud.hooks.gcs import GCSHook
который тоже нужно скопировать в плагин - начинается с 193-й строки.
Затем,
GCSHook
наследуется отGoogleBaseHook
класс, который мы можем изменить наGoogleCloudBaseHook
доступный в версии Airflow 1.10.6, и импортируйте его:from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
Наконец, необходимо импортировать
SFTPHook
кода в плагин - начинается с 39-й строки, наследуемой отSSHHook
class, мы можем использовать его из версии Airflow 1.10.6, изменив оператор импорта:from airflow.contrib.hooks.ssh_hook import SSHHook
В конце файла вы можете найти определение плагина:
class SFTPToGCSOperatorPlugin(AirflowPlugin):
name = "SFTPToGCSOperatorPlugin"
operators = [SFTPToGCSOperator]
Необходимо создание плагина, так как встроенный оператор Airflow в настоящее время недоступен в версии Airflow 1.10.6 (последней в Cloud Composer). Вы можете следить за списками версий Cloud Composer, чтобы узнать, когда будет доступна для использования новейшая версия Airflow.
Надеюсь, вы найдете приведенную выше информацию полезной.