В чем различия между многопоточными и многопроцессорными модулями?
Я учусь пользоваться threading
и multiprocessing
модули в Python для запуска определенных операций параллельно и ускорения моего кода.
Я нахожу это трудным (может быть, потому что у меня нет теоретических знаний об этом), чтобы понять, в чем разница между threading.Thread()
объект и multiprocessing.Process()
один.
Кроме того, мне не совсем понятно, как создать очередь заданий и чтобы только 4 (например) из них работали параллельно, в то время как другие ожидают освобождения ресурсов перед выполнением.
Я нахожу примеры в документации понятными, но не очень исчерпывающими; как только я пытаюсь немного усложнить ситуацию, я получаю много странных ошибок (например, метод, который не может быть рассортирован и т. д.).
Итак, когда я должен использовать threading
а также multiprocessing
модули?
Можете ли вы связать меня с некоторыми ресурсами, которые объясняют концепции этих двух модулей и как правильно их использовать для сложных задач?
7 ответов
То, что говорит Джулио Франко, верно для многопоточности и многопроцессорности в целом.
Однако в Python * есть еще одна проблема: есть глобальная блокировка интерпретатора, которая не позволяет двум потокам в одном и том же процессе одновременно выполнять код Python. Это означает, что если у вас 8 ядер и вы измените код на использование 8 потоков, он не сможет использовать 800% ЦП и работать в 8 раз быстрее; он будет использовать тот же 100% процессор и работать с той же скоростью. (На самом деле, он будет работать немного медленнее, потому что есть дополнительные издержки от потоков, даже если у вас нет общих данных, но пока игнорируйте это.)
Есть исключения из этого. Если тяжелые вычисления вашего кода на самом деле не происходят в Python, но в какой-то библиотеке с пользовательским кодом C, которая выполняет правильную обработку GIL, например, в виде приложения с клочками, вы получите ожидаемое преимущество в производительности от многопоточности. То же самое верно, если тяжелые вычисления выполняются каким-то подпроцессом, который вы запускаете и ждете.
Что еще более важно, есть случаи, когда это не имеет значения. Например, сетевой сервер тратит большую часть своего времени на чтение пакетов из сети, а приложение с графическим интерфейсом тратит большую часть своего времени на ожидание пользовательских событий. Одна из причин использования потоков на сетевом сервере или в приложении с графическим интерфейсом - это возможность выполнять длительные "фоновые задачи", не останавливая основной поток от продолжения обслуживания сетевых пакетов или событий графического интерфейса. И это прекрасно работает с потоками Python. (С технической точки зрения это означает, что потоки Python обеспечивают параллелизм, даже если они не обеспечивают параллелизма ядра.)
Но если вы пишете программу с привязкой к процессору на чистом Python, использование большего количества потоков обычно не помогает.
Использование отдельных процессов не имеет таких проблем с GIL, потому что каждый процесс имеет свой отдельный GIL. Конечно, между потоками и процессами у вас все еще есть те же компромиссы, что и в любых других языках - разделять данные между процессами труднее и дороже, чем между потоками, может быть дорого запускать огромное количество процессов или создавать и уничтожать их часто и т. д. Но GIL сильно влияет на баланс процессов, что, например, не относится к C или Java. Таким образом, вы обнаружите, что используете многопроцессорность гораздо чаще в Python, чем в C или Java.
Между тем, философия Python "включенные батареи" приносит некоторые хорошие новости: очень легко написать код, который можно переключать между потоками и процессами с помощью смены одной строки.
Если вы разрабатываете свой код в терминах автономных "заданий", которые ничего не делят с другими заданиями (или основной программой), кроме ввода и вывода, вы можете использовать concurrent.futures
библиотека для написания кода вокруг пула потоков:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(job, argument)
executor.map(some_function, collection_of_independent_things)
# ...
Вы даже можете получить результаты этих заданий и передать их на дальнейшие задания, ждать, пока все будет в порядке выполнения или в порядке завершения и т. Д.; прочитайте раздел Future
объекты для деталей.
Теперь, если окажется, что ваша программа постоянно использует процессор на 100%, а добавление большего количества потоков просто замедляет ее, то вы столкнулись с проблемой GIL, поэтому вам нужно переключиться на процессы. Все, что вам нужно сделать, это изменить эту первую строку:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
Единственное реальное предостережение в том, что аргументы и возвращаемые значения ваших заданий должны быть легко перестраиваемыми (и не требовать слишком много времени или памяти для перебора), чтобы их можно было использовать в перекрестном процессе. Обычно это не проблема, но иногда это так.
Но что, если ваша работа не может быть самостоятельной? Если вы можете разработать свой код с точки зрения задач, которые передают сообщения от одного к другому, это все еще довольно легко. Возможно, вам придется использовать threading.Thread
или же multiprocessing.Process
вместо того, чтобы полагаться на пулы. И вам придется создать queue.Queue
или же multiprocessing.Queue
объекты явно. (Существует множество других опций - каналы, сокеты, файлы со скоплениями,... но дело в том, что вы должны сделать что-то вручную, если автоматическая магия исполнителя недостаточна.)
Но что, если вы даже не можете положиться на передачу сообщений? Что делать, если вам нужно две работы, чтобы изменить одну и ту же структуру и увидеть изменения друг друга? В этом случае вам потребуется выполнить ручную синхронизацию (блокировки, семафоры, условия и т. Д.) И, если вы хотите использовать процессы, явные объекты совместной памяти для загрузки. Это когда многопоточность (или многопроцессорность) становится сложной. Если вы можете избежать этого, прекрасно; если вы не можете, вам нужно будет прочитать больше, чем кто-либо может вставить в SO-ответ.
Из комментария вы хотели узнать, что отличается между потоками и процессами в Python. Действительно, если вы прочитаете ответ Джулио Франко и мой, и все наши ссылки, это должно охватить все... но резюме определенно было бы полезно, так что здесь идет:
- Потоки обмениваются данными по умолчанию; процессы нет.
- Как следствие (1), отправка данных между процессами, как правило, требует их сортировки. **
- Как еще одно следствие (1), прямой обмен данными между процессами обычно требует перевода их в низкоуровневые форматы, такие как Value, Array и
ctypes
типы. - Процессы не подлежат GIL.
- На некоторых платформах (в основном Windows) процессы создания и уничтожения намного дороже.
- Существуют некоторые дополнительные ограничения для процессов, некоторые из которых отличаются на разных платформах. См. Руководство по программированию для деталей.
-
threading
Модуль не имеет некоторых особенностейmultiprocessing
модуль. (Ты можешь использоватьmultiprocessing.dummy
чтобы получить большую часть недостающего API поверх потоков, или вы можете использовать модули более высокого уровня, такие какconcurrent.futures
и не переживай.)
* Это на самом деле не Python, язык, который имеет эту проблему, а CPython, "стандартная" реализация этого языка. Некоторые другие реализации не имеют GIL, например, Jython.
** Если вы используете метод fork start для многопроцессорной обработки, что возможно на большинстве платформ, отличных от Windows, каждый дочерний процесс получает любые ресурсы, которые имел родительский процесс при запуске дочернего процесса, что может быть еще одним способом передачи данных дочерним процессам.
Несколько потоков могут существовать в одном процессе. Потоки, принадлежащие одному и тому же процессу, совместно используют одну и ту же область памяти (могут считывать и записывать одни и те же переменные и могут мешать друг другу). Напротив, разные процессы живут в разных областях памяти, и у каждого из них есть свои переменные. Для связи процессы должны использовать другие каналы (файлы, каналы или сокеты).
Если вы хотите распараллелить вычисления, вам, вероятно, понадобится многопоточность, потому что вы, вероятно, хотите, чтобы потоки взаимодействовали в одной и той же памяти.
Говоря о производительности, потоки быстрее создаются и управляются, чем процессы (потому что ОС не требуется выделять целую новую область виртуальной памяти), а межпотоковое взаимодействие обычно быстрее, чем межпроцессное взаимодействие. Но потоки сложнее программировать. Потоки могут мешать друг другу и могут записывать в память друг друга, но способ, которым это происходит, не всегда очевиден (из-за нескольких факторов, в основном переупорядочения команд и кэширования памяти), и поэтому вам понадобятся примитивы синхронизации для управления доступом к вашим переменным.
Цитаты из документации Python
Я выделил ключевые цитаты из документации Python о процессах и потоках и GIL по адресу: Что такое глобальная блокировка интерпретатора (GIL) в CPython?
Процесс против экспериментов с потоком
Я провел небольшое тестирование, чтобы более конкретно показать разницу.
В тесте я рассчитал время работы ЦП и ввода-вывода для различного количества потоков на 8-ми гиперпоточном ЦП. Работа, выполняемая для каждого потока, всегда одна и та же, поэтому большее количество потоков означает больше общей работы.
Результаты были:
https://stackru.com/images/8f413ef5f119f07c45d49c700f8147317e9452f1.png
Выводы:
для работы, связанной с процессором, многопроцессорность всегда быстрее, предположительно из-за GIL
для работы, связанной с вводом-выводом. оба имеют одинаковую скорость
потоки масштабируются только примерно до 4х вместо ожидаемых 8х, так как я использую 8-ми гиперпоточную машину.
Сравните это с работой C POSIX с привязкой к процессору, которая достигает ожидаемого 8-кратного ускорения: что означают слова real, user и sys в выводе time(1)?
TODO: Я не знаю причины этого, должно быть, в игру вступают и другие недостатки Python.
Код теста:
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import sys
def cpu_func(result, niters):
'''
A useless CPU bound function.
'''
for i in range(niters):
result = (result * result * i + 2 * result * i * i + 3) % 10000000
return result
class CpuThread(threading.Thread):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class CpuProcess(multiprocessing.Process):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class IoThread(threading.Thread):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
class IoProcess(multiprocessing.Process):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
if __name__ == '__main__':
cpu_n_iters = int(sys.argv[1])
sleep = 1
cpu_count = multiprocessing.cpu_count()
input_params = [
(CpuThread, cpu_n_iters),
(CpuProcess, cpu_n_iters),
(IoThread, sleep),
(IoProcess, sleep),
]
header = ['nthreads']
for thread_class, _ in input_params:
header.append(thread_class.__name__)
print(' '.join(header))
for nthreads in range(1, 2 * cpu_count):
results = [nthreads]
for thread_class, work_size in input_params:
start_time = time.time()
threads = []
for i in range(nthreads):
thread = thread_class(work_size)
threads.append(thread)
thread.start()
for i, thread in enumerate(threads):
thread.join()
results.append(time.time() - start_time)
print(' '.join('{:.6e}'.format(result) for result in results))
GitHub upstream + построение кода в том же каталоге.
Проверено на Ubuntu 18.10, Python 3.6.7, на ноутбуке Lenovo ThinkPad P51 с процессором: Intel Core i7-7820HQ CPU (4 ядра / 8 потоков), RAM: 2x Samsung M471A2K43BB1-CRC (2x 16GiB), SSD: Samsung MZVLB512HAJQ-000L7 (3000 МБ / с).
Визуализируйте, какие потоки работают в данный момент
Этот пост https://rohanvarma.me/GIL/ научил меня, что вы можете запускать обратный вызов всякий раз, когда поток запланирован с target=
аргумент threading.Thread
и то же самое для multiprocessing.Process
.
Это позволяет нам видеть, какой именно поток запускается каждый раз. Когда это будет сделано, мы увидим что-то вроде (я построил этот конкретный график):
+--------------------------------------+
+ Active threads / processes +
+-----------+--------------------------------------+
|Thread 1 |******** ************ |
| 2 | ***** *************|
+-----------+--------------------------------------+
|Process 1 |*** ************** ****** **** |
| 2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
+ Time --> +
+--------------------------------------+
который показал бы, что:
- потоки полностью сериализованы GIL
- процессы могут работать параллельно
Я считаю, что эта ссылка изящно отвечает на ваш вопрос.
Короче говоря, если одна из ваших подзадач должна подождать, пока другая закончит работу, многопоточность хороша (например, в тяжелых операциях ввода / вывода); напротив, если ваши подзадачи действительно могут возникать одновременно, рекомендуется многопроцессорная обработка. Тем не менее, вы не будете создавать больше процессов, чем количество ядер.
Вот некоторые данные о производительности для Python 2.6.x, которые ставят под сомнение представление о том, что многопоточность является более производительной, чем многопроцессорная обработка в сценариях, связанных с вводом-выводом. Эти результаты получены на 40-процессорной IBM System x3650 M4 BD.
Обработка, связанная с вводом-выводом: пул процессов работал лучше, чем пул потоков
>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms
>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms
Обработка с привязкой к процессору: пул процессов работал лучше, чем пул потоков
>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms
>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms
Это не строгие тесты, но они говорят мне, что многопроцессорность не совсем бесполезна по сравнению с многопоточностью.
Код, используемый в интерактивной консоли Python для вышеуказанных тестов
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob
text_for_test = str(range(1,100000))
def fileio(i):
try :
os.remove(glob('./test/test-*'))
except :
pass
f=open('./test/test-'+str(i),'a')
f.write(text_for_test)
f.close()
f=open('./test/test-'+str(i),'r')
text = f.read()
f.close()
def square(i):
return i*i
def timing(f):
def wrap(*args):
time1 = time.time()
ret = f(*args)
time2 = time.time()
print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
return ret
return wrap
result = None
@timing
def do_work(process_count, items, process_type, method) :
pool = None
if process_type == 'process' :
pool = Pool(processes=process_count)
else :
pool = ThreadPool(processes=process_count)
if method == 'square' :
multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
result = [res.get() for res in multiple_results]
else :
multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
result = [res.get() for res in multiple_results]
do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')
do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')
Разница
Применение
- Когда использовать потоки
- Используйте потоки для IO-Bound
- Чтение или запись файла с жесткого диска.
- Чтение или запись в стандартный вывод, ввод или ошибка (stdin, stdout, stderr).
- Печать документа.
- Загрузка или загрузка файла.
- Запрос сервера.
- Запрос базы данных.
- Сделать фото или записать видео.
- Используйте потоки для IO-Bound
- Когда использовать процессы
- Используйте процессы для задач, связанных с ЦП
- Вычисление точек во фрактале.
- Оценка Пи
- Факторинг простых чисел.
- Парсинг HTML, JSON и т.д. документов.
- Обработка текста.
- Запуск симуляций.
- Используйте процессы для задач, связанных с ЦП
Источник: поток против процесса в Python
Ну, на большинство вопросов отвечает Джулио Франко. Я более подробно остановлюсь на проблеме "потребитель-производитель", которая, я полагаю, поможет вам выбрать правильное решение для использования многопоточного приложения.
fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()
def producer(fill_count, empty_count, buffer):
while True:
item = produceItem()
empty_count.down();
buffer.push(item)
fill_count.up()
def consumer(fill_count, empty_count, buffer):
while True:
fill_count.down()
item = buffer.pop()
empty_count.up()
consume_item(item)
Вы можете прочитать больше о примитивах синхронизации из:
http://linux.die.net/man/7/sem_overview
http://docs.python.org/2/library/threading.html
Псевдокод выше. Я полагаю, вам следует поискать проблему "производитель-потребитель", чтобы получить больше ссылок.