Как лучше всего обмениваться статическими данными между клиентом ipyparallel и удаленными механизмами?
Я запускаю одну и ту же симуляцию в цикле с разными параметрами. Каждое моделирование использует панду DataFrame (data
) который только читается, никогда не изменяется. С помощью ipyparallel
(Параллельно IPython), я могу поместить эти DataFrames в пространство глобальных переменных каждого движка в моем представлении перед началом моделирования:
view['data'] = data
Затем у двигателей есть доступ к DataFrame для всех симуляций, которые на них запускаются. Процесс копирования данных (если маринованный, data
составляет 40 МБ), это всего лишь несколько секунд. Однако, похоже, что если количество симуляций растет, использование памяти становится очень большим. Я полагаю, что эти общие данные копируются для каждой задачи, а не только для каждого механизма. Как лучше всего делиться статическими данными только для чтения с клиента с движками? Копирование его один раз для каждого движка является приемлемым, но в идеале его нужно будет копировать только один раз для каждого хоста (у меня 4 движка на host1 и 8 движков на host2).
Вот мой код:
from ipyparallel import Client
import pandas as pd
rc = Client()
view = rc[:] # use all engines
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task
def do_simulation(tweaks):
""" Run simulation with specified tweaks """
# Do sim stuff using the global data DataFrame
return results, id, tweaks
if __name__ == '__main__':
data = pd.read_sql("SELECT * FROM my_table", engine)
threads = [] # store list of tweaks dicts
for i in range(4):
for j in range(5):
for k in range(6):
threads.append(dict(i=i, j=j, k=k)
# Set up globals for each engine. This is the read-only DataFrame
view['data'] = data
ar = view.map_async(do_simulation, threads)
# Our async results should pop up over time. Let's measure our progress:
for idx, (results, id, tweaks) in enumerate(ar):
print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
# Store results as a pickle for the future
pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
# Save our results to a pickle file
pd.to_pickle(results, out_file_path + pfile)
print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)
Если количество симуляторов невелико (~50), то для начала требуется некоторое время, но я начинаю видеть операторы печати. Как ни странно, несколько задач будут назначены на один и тот же механизм, и я не вижу ответа, пока все эти назначенные задачи не будут выполнены для этого механизма. Я ожидаю увидеть ответ от enumerate(ar)
каждый раз, когда одна задача моделирования завершается.
Если количество симуляторов велико (~1000), потребуется много времени, чтобы начать, я вижу, что ЦП работают на всех двигателях, но операторы печати прогресса не видны в течение долгого времени (~40 минут), и когда я вижу прогресс, кажется, большой блок (>100) задач шел к одному и тому же движку и ждал завершения от этого одного движка, прежде чем обеспечить некоторый прогресс. Когда тот двигатель закончил, я увидел ar
объект предоставил новые ответы каждые 4 секунды - возможно, это была временная задержка для записи выходных файлов pickle.
Наконец, host1 также запускает задачу ipycontroller, и его использование памяти возрастает как сумасшедший (задача Python показывает использование>6 ГБ ОЗУ, задача ядра показывает использование 3 ГБ). Движок host2 на самом деле не показывает много использования памяти вообще. Что может вызвать этот всплеск памяти?
2 ответа
Я использовал эту логику в коде пару лет назад, и я получил это. Мой код был что-то вроде:
shared_dict = {
# big dict with ~10k keys, each with a list of dicts
}
balancer = engines.load_balanced_view()
with engines[:].sync_imports(): # your 'view' variable
import pandas as pd
import ujson as json
engines[:].push(shared_dict)
results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()
Если количество симуляторов невелико (~50), то для начала требуется некоторое время, но я начинаю видеть операторы печати. Как ни странно, несколько задач будут назначены на один и тот же механизм, и я не вижу ответа, пока все эти назначенные задачи не будут выполнены для этого механизма. Я ожидаю увидеть ответ от enumerate(ar) каждый раз, когда завершается одна задача симуляции.
В моем случае, my_func()
Это был сложный метод, в котором я помещал множество сообщений, записанных в файл, поэтому у меня были свои операторы печати.
О назначении задачи, как я использовал load_balanced_view()
Я ушел в библиотеку найти свой путь, и он отлично справился.
Если количество симуляторов велико (~1000), потребуется много времени, чтобы начать, я вижу, что ЦП работают на всех двигателях, но операторы печати прогресса не видны в течение долгого времени (~40 минут), и когда я вижу прогресс, кажется, большой блок (>100) задач шел к одному и тому же движку и ждал завершения от этого одного движка, прежде чем обеспечить некоторый прогресс. Когда этот движок завершил работу, я увидел, что объект ar выдавал новые ответы каждые 4 секунды - возможно, это была временная задержка для записи выходных файлов pickle.
Долгое время я этого не испытывал, поэтому ничего не могу сказать.
Я надеюсь, что это может пролить свет на вашу проблему.
PS: как я уже сказал в комментарии, вы можете попробовать многопроцессорную работу. Думаю, я не пытался использовать большие данные только для чтения в качестве глобальной переменной, используя их. Я бы попробовал, потому что это похоже на работу.
Иногда вам нужно разбить вашу группу данных по категориям, чтобы вы были уверены, что каждая подгруппа будет полностью содержаться в одном кластере.
Вот как я обычно это делаю:
# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview = client.load_balanced_view()
lview.block = True
CORES = len(client[:])
# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
sz = df.groupby([grouper]).size().sort_values().index.unique()
for core in range(CORES):
ids = sz[core::CORES]
print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
client[core].push({name:df[df[grouper].isin(ids)]})
# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')
Обратите внимание, что функция, которую я предлагаю, рассеивает, гарантирует, что каждый кластер будет содержать одинаковое количество наблюдений, что обычно является хорошей идеей.