Как динамически масштабировать StarCluster/qsub/EC2 для запуска параллельных заданий на нескольких узлах
Я новичок в использовании Startcluster/qsub/grid engine для запуска параллельных заданий, и я попытался прочитать пару других постов, касающихся того же самого. Я до сих пор не уверен, как построить масштабируемое решение для моих конкретных требований. Я хотел бы принять еще несколько предложений, прежде чем продолжить то же самое.
Вот мои требования:
У меня есть огромный tar-файл [~40 - 50 ГБ, и он может доходить до 100 ГБ] -----> Здесь я мало что могу сделать. Я принимаю этот огромный tar-файл в качестве входных данных.
Я должен распаковать и распаковать его -----> Я запускаю tar xvf tarfilename.tar | параллельный pbzip -d распаковать и распаковать тоже самое.
Вывод этого несжатого файла, скажем, несколько сотен тысяч файлов, около 500000 файлов.
Это несжатые файлы должны быть обработаны. У меня есть модульный код, который может взять каждый файл, обработать его и вывести 5 разных файлов.
Tar-файл ----- параллельная распаковка ---> несжатые файлы ----- параллельная обработка ---> 5 обработанных файлов на файл обрабатывается
В настоящее время у меня есть параллельный скрипт на python, который работает на 16 ядрах, 16 ГБ памяти занимает этот список несжатых файлов и обрабатывает их параллельно.
Проблема в том, как мне плавно масштабировать. Например, если мой код, скажем, работает в течение 10 часов, и я хотел бы добавить еще одну 8-ядерную машину к нему, я не могу сделать это в параллельном Python, поскольку мне нужно было бы заранее знать количество процессоров.
В то же время, когда я динамически добавляю больше узлов в текущий кластер, как насчет доступности данных и операций чтения / записи?
Итак, я начал читать и экспериментировать с Starcluster и qsub. Хотя я вижу, что могу отправить несколько заданий через qsub, как мне сделать так, чтобы входные файлы брались из несжатой входной папки?
Например, могу ли я написать скрипт script.sh, который в цикле for выбирает имена файлов одно за другим и передает их команде qsub? Есть ли другое эффективное решение?
Скажем, если у вас есть 3 машины с 16 ЦП на каждой, и если я отправлю 48 заданий в очередь, qsub автоматически запустит их в разных ЦП кластеров или мне придется использовать параметры параллельной среды, такие как -np orte command, установить число процессоров в каждом кластере соответственно. Нужно ли сделать мой исполняемый скрипт Python MPI исполняемым?
Таким образом, у меня есть несколько сотен тысяч файлов для ввода, я хотел бы отправить их в очереди заданий на многоядерные машины. Если я динамически добавляю больше машин, задания должны автоматически распределяться.
Еще одна важная проблема: мне нужно, чтобы в конце все результаты 500000 нечетных операций были агрегированы? Есть ли предложение о том, как агрегировать вывод параллельных заданий по мере того, как и когда вывод выводится?
Я тестирую несколько сценариев, но мне хотелось бы знать, есть ли люди, которые экспериментировали с подобными сценариями.
Любые предложения, используя плагин Hadoop? http://star.mit.edu/cluster/docs/0.93.3/plugins/hadoop.html
Спасибо заранее Karthick
2 ответа
После некоторого изучения различных вариантов динамического масштабирования я решил использовать механизм очереди, чтобы распределять задания между несколькими работниками.
Job_Manager - считывает ввод, создает задание, добавляет задание в очередь. SQS Очередь - это служба очереди. Рабочие процессы - прослушивает очередь и обрабатывает вывод.
Диски ввода / вывода являются NFS и доступны для всех серверов / клиентов.
Для динамического масштабирования добавьте информацию о клиенте NFS в /exports и перезапустите сервер. Активные клиенты имеют конфигурацию rw, hard, intr в соответствующем fstab. Запуская n рабочих процессов в новом клиенте, в процесс добавляется больше рабочих.
Пока что это надежно и хорошо масштабируется. Мне удалось запустить около 90 рабочих на 3 компьютерах и обработать 200000 файлов менее чем за 5 часов. Ранее это заняло около 24 часов, так как я не мог распределить данные и запустить рабочих на нескольких узлах.
Ввод / вывод и обмен данными. Если у вас низкий уровень ввода-вывода, вы можете оставить свои данные на главном узле и использовать nfs для обмена ими между вашими узлами. Если у вас много операций ввода-вывода, я бы порекомендовал использовать ведро S3.
Распространение: ваш bash-скрипт, запускающий несколько qsub, является правильным решением. Вы можете либо вызвать его в одном файле, либо в нескольких файлах одновременно.
Масштабирование. Рассматривайте параллельные задания, выполняемые в кластере, как разные задачи. Вы должны запустить 1 или более экземпляров приложения на каждом узле. Например: если вы используете узлы cr1.8xlarge, у вас есть 32 ядра. Вы можете запустить 1 экземпляр вашего приложения, используя 32 ядра или 4 экземпляра вашего приложения, используя 8 ядер. См. Конфигурацию "слотов" для каждого узла в Open Grid Engine. (Если вы были более готовы запустить один большой экземпляр вашего приложения, объединяющий ядра нескольких узлов, я никогда этого не делал, поэтому не могу вам с этим помочь.) Затем, чтобы добавить узел, вы можете использовать "addnode" команда от StarCluster. Как только узел подключен, OGS автоматически распределяет там задания. Вы также можете использовать StarCluster loadbalancer для автоматического добавления / удаления узлов.
Итак, вот мое предложение. 1. Распакуйте ваши файлы в S3. 2. Запустите StarCluster. 3. Используя ваш bashscript, выполните qsub задания для каждых нескольких файлов (может быть более эффективно для задания работать, скажем, на 10 файлах, чем для задания для каждого отдельного файла) 4. Ваше приложение должно выполнить ввод / вывод на s3, 5. Когда очередь пуста, посмотрите сценарий на результаты, чтобы убедиться, что все задания выполнялись хорошо. Вы можете перепланировать задания, если выходные данные отсутствуют.
- Я не знаю, как твоя агрегация, поэтому я не могу сказать.
- Я никогда не использовал Hadoop, поэтому я тоже не могу помочь.
- Вам не нужно делать MPI-скрипт Python исполняемым.
- Если вы используете гетерогенный кластер, то вы с самого начала знаете, сколько ядер будет доступно на каждом узле.
- Если вы определяете узел с 32 ядрами, чтобы иметь 4 слота, то вы должны использовать в своих работах максимум 8 ядер каждый.