Запись пакета dask с фреймом данных на диск (создание 2 миллионов функций с помощью dask и featuretools)

Я очень новичок как в Dask, так и в Featuretools, поэтому у меня возникло много трудностей при их объединении для параллельной разработки функций.

Короткая версия: решение насущной проблемы У меня есть сумка для напитков dfs из pandas DataFrame и хотите вывести их как csv с каждым файлом, имеющим раздел в качестве идентификатора. to_textfiles () выдал ошибку, и я не могу найти способ получить номер раздела для использования dfs.map(pd.to_csv, "[partition_num].csv"), Есть ли способ сделать это?

>>> dfs 
dask.bag<map-par..., npartitions=2>

>>> type(dfs.compute()[0])
pandas.core.frame.DataFrame

>>> dfs.to_textfiles('feature_matrices/calculate_matrix/*_test')

anaconda3/envs/featuretools/lib/python3.6/site-packages/dask/utils.py in ensure_unicode()
    592         return s.decode()
    593     msg = "Object %s is neither a bytes object nor has an encode method"
--> 594     raise TypeError(msg % s)

TypeError: ('Long error message', 'Object                 Age          ArrivalMethod\nPAT_ENC_CSN_ID                            \n3223775624       33                    Car\n3223776450       82         Medical Flight\n3223776487       65                  Other\n3223776543       31              Ambulance\n3223835687       89              Ambulance\n3223838474       42  Public Transportation\n3223842283       11              Ambulance\n3223845045       60              A

Длинная версия: Для тех, кто интересуется, почему у меня пакет данных с пандами, я помещаю всю свою проблему здесь в поисках лучшего подхода. Я пытаюсь использовать featuretools для создания 2 миллионов объектов для набора данных из 22 тыс. Строк (для выбора объектов позже). Я пытаюсь следовать ссылкам ( этот пост и этот блокнот). В записной книжке набор данных был огромным (45 миллионов строк) и намного больше, чем мой набор данных из 22 000 строк.

Тем не менее, я разбил свой раздел на 741 ряд, так как entity set полных данных для Calculate_feature_matrix имел последовательный компонент, который занимал слишком много времени (вероятно, для распространения entity set к работникам). Это происходит, даже если я создаю только одну функцию со всем набором данных. Ни один из моих dask-workers ( LSFCluster) загрузка ЦП превысила 5% после 20 минут работы calculate_matrix и это привело к огромному отслеживанию ошибок:

Использование всего набора данных с одной функцией:

...
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "/path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
Exception in callback BaseAsyncIOLoop._handle_events(110, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(110, 1)>
Traceback (most recent call last):
  File "/path/anaconda3/envs/featuretools/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/lab/corradin_data/FOR_AN/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files

В дополнение к разделению набора данных, я также разделяю по функциям, выполняя по одной функции за раз. Теперь я хочу записать эту функцию на диск, но хочу объединить их в куски по 1 тыс. Вместо вывода 2 млн. CSV-файлов. Ниже мой подход до сих пор, который заканчивается dask bag из pandas DataFrame

Для каждого раздела из 741 строки рассчитайте по одному объекту за раз:

from dask_jobqueue import LSFCluster
from dask.distributed import Client
cluster = LSFCluster(...)
client = Client(cluster)

# take a feature, return a feature matrix for a subset of data
def make_feature(feature):
    feature_name = feature.generate_name()
    try:
        feature_matrix = ft.calculate_feature_matrix(feature, entityset=es, n_jobs= 1, verbose = 1) #es is one partition of dataset

        print(f"Finished generating feature {feature_name}")
        return feature_matrix
    except:
        print(f"Could not make feature: {feature_name}")
        print("--------")
        return None

import dask.bag as db
b = db.from_sequence(feature_list, partition_size=1000) # 1k feature per partition
b = b.map(make_feature) 

#concatenate 1k dataframe (1 partition) to 1 df 
def concat(partition):
    series = [i for i in partition]
    df = pd.concat(series,axis =1)
    return [df]

dfs = b.map_partitions(concat) # dask bag of dataframes

overall_start = timer()
dfs.compute()
overall_end = timer()

print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")

#ouput to disk here
???

Это мой первый вопрос, поэтому, пожалуйста, дайте мне знать, что исправить / добавить, чтобы мой вопрос был понятнее. Спасибо!

0 ответов

Другие вопросы по тегам