Как запустить задачу luigi с помощью spark-submit и pyspark
У меня есть задача Python Luigi, которая включает в себя некоторые библиотеки Pyspark. Теперь я хотел бы представить эту задачу на мезо с помощью spark-submit. Что я должен сделать, чтобы запустить его? Ниже мой скелет кода:
from pyspark.sql import functions as F
from pyspark import SparkContext
class myClass(SparkSubmitTask):
# date = luigi.DateParameter()
def __init__(self, date):
self.date = date # date is datetime.date.today().isoformat()
def output(self):
def input(self):
def run(self):
# Some functions are using pyspark libs
if __name__ == "__main__":
luigi.run()
Без luigi я отправляю эту задачу в виде следующей командной строки:
/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py
Теперь проблема в том, как я могу инициировать отправку задачи luigi, которая включает в себя командную строку luigi, такую как:
luigi --module my_module myClass --local-scheduler --date 2016-01
Еще один вопрос: если у my_module.py есть необходимая задача, чтобы закончить сначала, мне нужно сделать что-то еще для нее или просто установить такую же, как текущая командная строка?
Я очень благодарен за любые подсказки или предложения для этого. Спасибо большое.
1 ответ
У Луиджи есть несколько шаблонных задач. Один из них называется PySparkTask. Вы можете наследовать от этого класса и переопределить свойства:
https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.
Я не проверял это, но, основываясь на моем опыте с Луиджи, я бы попробовал это:
import my_module
class MyPySparkTask(PySparkTask):
date = luigi.DateParameter()
@property
def name(self):
return self.__class__.__name__
@property
def master(self):
return 'mesos://host:port'
@property
def deploy_mode(self):
return 'cluster'
@property
def total_executor_cores(self):
return 1
@property
def driver_cores(self):
return 1
@property
def executor-memory(self):
return 1G
@property
def driver-memory(self):
return 1G
def main(self, sc, *args):
my_module.run(sc)
def self.app_options():
return [date]
Затем вы можете запустить его с помощью: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01
Существует также возможность установить свойства в файле client.cfg, чтобы сделать их значениями по умолчанию для других PySparkTasks:
[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G