Python: выполнять подпроцесс cat параллельно
Я бегу несколько cat | zgrep
Команды на удаленном сервере и сбор их выходных данных для дальнейшей обработки:
class MainProcessor(mp.Process):
def __init__(self, peaks_array):
super(MainProcessor, self).__init__()
self.peaks_array = peaks_array
def run(self):
for peak_arr in self.peaks_array:
peak_processor = PeakProcessor(peak_arr)
peak_processor.start()
class PeakProcessor(mp.Process):
def __init__(self, peak_arr):
super(PeakProcessor, self).__init__()
self.peak_arr = peak_arr
def run(self):
command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
log_lines = (subprocess.check_output(command, shell=True)).split('\n')
process_data(log_lines)
Это, однако, приводит к последовательному выполнению команд подпроцесса ('ssh ... cat ...'). Второй пик ждет первого до конца и так далее.
Как я могу изменить этот код так, чтобы вызовы подпроцесса выполнялись параллельно, и при этом была возможность собирать выходные данные для каждого из них в отдельности?
2 ответа
Другой подход (а не другое предложение о размещении процессов оболочки в фоновом режиме) заключается в использовании многопоточности.
run
метод, который у вас есть, будет делать что-то вроде этого:
thread.start_new_thread ( myFuncThatDoesZGrep)
Чтобы собрать результаты, вы можете сделать что-то вроде этого:
class MyThread(threading.Thread):
def run(self):
self.finished = False
# Your code to run the command here.
blahBlah()
# When finished....
self.finished = True
self.results = []
Запустите поток, как указано выше в ссылке на многопоточность. Когда ваш объект потока имеет myThread.finished == True, вы можете получить результаты через myThread.results.
Вам не нужно ни того, ни другого multiprocessing
ни threading
для параллельного запуска подпроцессов, например:
#!/usr/bin/env python
from subprocess import Popen
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]
он запускает 5 команд оболочки одновременно. Примечание: ни темы, ни multiprocessing
модуль используется здесь. Нет смысла добавлять амперсанд &
Команды оболочки: Popen
не ждет завершения команды Вам нужно позвонить .wait()
в явном виде.
Это удобно, но нет необходимости использовать потоки для сбора выходных данных из подпроцессов:
#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
for i in range(5)]
# collect output in parallel
def get_lines(process):
return process.communicate()[0].splitlines()
outputs = Pool(len(processes)).map(get_lines, processes)
Связанный: Python продвигает несколько подпроцессов bash?,
Вот пример кода, который получает выходные данные от нескольких подпроцессов одновременно в одном потоке:
#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
@asyncio.coroutine
def get_lines(shell_command):
p = yield from asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
return (yield from p.communicate())[0].splitlines()
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
.format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()