Как запустить задачу luigi с помощью spark-submit и pyspark

У меня есть задача Python Luigi, которая включает в себя некоторые библиотеки Pyspark. Теперь я хотел бы представить эту задачу на мезо с помощью spark-submit. Что я должен сделать, чтобы запустить его? Ниже мой скелет кода:

from pyspark.sql import functions as F
from pyspark import SparkContext

class myClass(SparkSubmitTask):
# date = luigi.DateParameter()

  def __init__(self, date):
    self.date = date # date is datetime.date.today().isoformat()

  def output(self):

  def input(self):

  def run(self):
    # Some functions are using pyspark libs

if __name__ == "__main__":
  luigi.run()

Без luigi я отправляю эту задачу в виде следующей командной строки:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py

Теперь проблема в том, как я могу инициировать отправку задачи luigi, которая включает в себя командную строку luigi, такую ​​как:

luigi --module my_module myClass --local-scheduler --date 2016-01

Еще один вопрос: если у my_module.py есть необходимая задача, чтобы закончить сначала, мне нужно сделать что-то еще для нее или просто установить такую ​​же, как текущая командная строка?

Я очень благодарен за любые подсказки или предложения для этого. Спасибо большое.

1 ответ

У Луиджи есть несколько шаблонных задач. Один из них называется PySparkTask. Вы можете наследовать от этого класса и переопределить свойства:

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

Я не проверял это, но, основываясь на моем опыте с Луиджи, я бы попробовал это:

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

Затем вы можете запустить его с помощью: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

Существует также возможность установить свойства в файле client.cfg, чтобы сделать их значениями по умолчанию для других PySparkTasks:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
Другие вопросы по тегам