Загрузите один большой файл с клиента для рабочих.

Как сделать один большой файл объемом 8 ГБ доступным для всех других рабочих узлов в dask? я пытался pd.read_csv() с chunksize а также client.scatter но это занимает довольно много времени. Я запускаю его на macOS.

Это мой код:

import time

import pandas as pd
import dask as dask
import dask.distributed as distributed
import dask.dataframe as dd
import dask.delayed as delayed
from dask.distributed import Client, progress


client = Client(IP:PORT)
print client
print client.scheduler_info()
f = []
chunksize = 10 ** 6

for chunk in pd.read_csv('file.csv', chunksize=chunksize):
    f_in = client.scatter(chunk)
    f.append(f_in)
print "read"

ddf = dd.from_delayed(f)        
ddf = ddf.groupby(['col1'])[['col2']].sum()

future = client.compute(ddf)
print future
progress(future)

result = client.gather(future)
print result        

Застрял в этом. Заранее спасибо!

2 ответа

Dask будет разбивать файл на части, если это CSV-файл (не сжатый), не зная, почему вы пытаетесь разделить его на части. Просто делать:

импортировать dask.dataframe как dd df = dd.read_csv('data*.csv')

В своем рабочем процессе вы загружаете данные CSV локально, анализируете их в кадры данных, а затем передаете сериализованные версии этих кадров данных работникам, по одному за раз.

Некоторые возможные решения:

  • скопируйте файл каждому работнику (что расточительно) или поместите его в какое-то место, которое они все могут видеть, например, в общую файловую систему или облачное хранилище

  • использовать файл client.upload_file, который не был рассчитан на большую полезную нагрузку, а также реплицировался на каждого работника

  • использование dask.bytes.read_bytes читать последовательные блоки данных, как и раньше, и сохранять их для рабочих, чтобы, по крайней мере, вы не понесли затрат на сериализацию, а усилия по анализу распределялись между работниками.

Другие вопросы по тегам