Противоречивое время обработки в распределенном фастпарке
У меня есть файл паркета в формате улья и мгновенное сжатие. Он помещается в память, и pandas.info предоставляет следующие данные.
Количество строк в группе в файле паркета составляет всего 100 КБ
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM=
Data columns (total 8 columns):
payment_method_id int16
payment_plan_days int16
plan_list_price int16
actual_amount_paid int16
is_auto_renew bool
transaction_date datetime64[ns]
membership_expire_date datetime64[ns]
is_cancel bool
dtypes: bool(2), datetime64[ns](2), int16(4)
memory usage: 698.7+ MB
Теперь, делая несколько простых расчетов с помощью dask, я получаю
Использование потоков
>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:44:50 2017'
141.98732048354384
'Fri Oct 13 23:44:59 2017'
Использование распределенного (локальный кластер)
>>> c=Client()
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:47:04 2017'
141.98732048354384
'Fri Oct 13 23:47:15 2017'
>>>
Это было хорошо, около 9 секунд каждый.
Теперь, используя многопроцессорность, приходит сюрприз...
>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime()
'Fri Oct 13 23:50:43 2017'
141.98732048354384
'Fri Oct 13 23:57:49 2017'
>>>
Я ожидал бы, что многопроцессорный и распределенный / локальный кластер будут иметь тот же порядок величины, возможно, с некоторыми отличиями от потоков (хорошо это или плохо)
Тем не менее, многопроцессорность занимает 47 раз больше времени, чтобы сделать простое среднее значение в колонке in16?
Мой env - это просто новая установка conda с необходимыми модулями. Никаких отборов.
почему есть эти различия?? Я не могу управлять dask/ распределенным, чтобы иметь предсказуемое поведение, чтобы иметь возможность разумно выбирать между различными планировщиками в зависимости от характера моей проблемы.
Это всего лишь игрушечный пример, но мне не удалось привести пример в соответствие с моими ожиданиями (как минимум с моим пониманием чтения документов).
Есть ли что-то, что я должен держать в глубине души? или я просто полностью упускаю суть?
Спасибо
JC
1 ответ
С помощью многопоточного планировщика каждая задача имеет доступ ко всей памяти процесса - в данном случае ко всем данным - и поэтому может выполнять свои вычисления без копирования памяти.
Благодаря распределенному планировщику планировщик знает, какой поток и какой работник создает данные, необходимые для последующей задачи, или уже имеет эти данные в памяти. Хитрость планировщика специально предназначена для переноса вычислений на нужного работника, чтобы избежать передачи и копирования данных.
И наоборот, многопроцессный планировщик имеет тенденцию отправлять результаты задач в основной процесс и из него, что может включать в себя много сериализации и копирования. Некоторые задачи могут быть объединены (объединение задач путем вызова многих функций Python в цепочке), но некоторые не могут. Любая сериализация и копирование требуют усилий процессора и, возможно, более важного для вас пространства памяти. Если ваши исходные данные составляют значительную часть общего объема системы, вы, вероятно, заполняете физическую память, что приводит к значительному замедлению.