Загрузите один большой файл с клиента для рабочих.
Как сделать один большой файл объемом 8 ГБ доступным для всех других рабочих узлов в dask? я пытался pd.read_csv()
с chunksize
а также client.scatter
но это занимает довольно много времени. Я запускаю его на macOS.
Это мой код:
import time
import pandas as pd
import dask as dask
import dask.distributed as distributed
import dask.dataframe as dd
import dask.delayed as delayed
from dask.distributed import Client, progress
client = Client(IP:PORT)
print client
print client.scheduler_info()
f = []
chunksize = 10 ** 6
for chunk in pd.read_csv('file.csv', chunksize=chunksize):
f_in = client.scatter(chunk)
f.append(f_in)
print "read"
ddf = dd.from_delayed(f)
ddf = ddf.groupby(['col1'])[['col2']].sum()
future = client.compute(ddf)
print future
progress(future)
result = client.gather(future)
print result
Застрял в этом. Заранее спасибо!
2 ответа
Dask будет разбивать файл на части, если это CSV-файл (не сжатый), не зная, почему вы пытаетесь разделить его на части. Просто делать:
импортировать dask.dataframe как dd df = dd.read_csv('data*.csv')
В своем рабочем процессе вы загружаете данные CSV локально, анализируете их в кадры данных, а затем передаете сериализованные версии этих кадров данных работникам, по одному за раз.
Некоторые возможные решения:
скопируйте файл каждому работнику (что расточительно) или поместите его в какое-то место, которое они все могут видеть, например, в общую файловую систему или облачное хранилище
использовать файл client.upload_file, который не был рассчитан на большую полезную нагрузку, а также реплицировался на каждого работника
использование
dask.bytes.read_bytes
читать последовательные блоки данных, как и раньше, и сохранять их для рабочих, чтобы, по крайней мере, вы не понесли затрат на сериализацию, а усилия по анализу распределялись между работниками.