Как предварительно кэшировать dask.dataframe для всех рабочих и разделов, чтобы уменьшить потребность в связи

Иногда привлекательно использовать dask.dataframe.map_partitions для операций, таких как слияния. В некоторых случаях при объединении left_df и right_df с помощью map_partitionsЯ хотел бы, по существу, предварительно кэшировать right_df перед выполнением слияния, чтобы уменьшить нагрузку на сеть / локальные перетасовки. Есть ли какой-нибудь четкий способ сделать это? Такое ощущение, что это должно быть возможно с одним из или комбинацией client.scatter(the_df), client.run(func_to_cache_the_df)или какое-то другое интеллектуальное вещание.

Это особенно заметно в контексте выполнения левого соединения на большом left_df с гораздо меньшим right_df это по сути таблица поиска. Такое ощущение, что это right_df должна быть в состоянии считывать в память и сохранять / разбрасывать по всем рабочим / разделам перед объединением, чтобы уменьшить потребность в межсекторном обмене данными до самого конца. Как я могу рассеять right_df успешно сделать это?

Ниже приведен меньший пример такого несбалансированного слияния с использованием cuDF и Dask (но концептуально это будет то же самое с пандами и Dask):

import pandas as pd
import cudf
import dask_cudf
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# create a local CUDA cluster
cluster = LocalCUDACluster()
client = Client(cluster)

np.random.seed(12)

nrows_left = 1000000
nrows_right = 1000

left = cudf.DataFrame({'a': np.random.randint(0,nrows_right,nrows_left), 'left_value':np.arange(nrows_left)})
right = cudf.DataFrame({'a': np.arange(nrows_right), 'lookup_val': np.random.randint(0,1000,nrows_right)})

print(left.shape, right.shape) # (1000000, 2) (1000, 2)

ddf_left = dask_cudf.from_cudf(left, npartitions=500)
ddf_right = dask_cudf.from_cudf(right, npartitions=2)

def dask_merge(L, R):
    return L.merge(R, how='left', on='a')

result = ddf_left.map_partitions(dask_merge, R=ddf_right).compute()
result.head()
<cudf.DataFrame ncols=3 nrows=5 >
     a  left_value  lookup_val
0  219        1952         822
1  873        1953         844
2  908        1954         142
3  290        1955         810
4  863        1956         910

1 ответ

Если вы выполните одно из следующих действий, все должно быть в порядке:

  • Объединение с одним разделом dask dataframe
  • Слияние с некадровым фреймом данных (например, Pandas или cuDF)
  • Map_partitions с некадровым фреймом данных (например, Pandas или cuDF)

Что происходит, это:

  1. Один раздел выталкивается на одного работника
  2. Во время выполнения несколько работников будут дублировать эти данные, а затем другие будут дублироваться от этих работников и т. Д., Передавая данные в виде дерева
  3. Рабочие сделают слияние, как ожидается

Это примерно так быстро, как можно ожидать. Однако, если вы делаете что-то вроде бенчмаркинга и хотите разделить шаги 1,2 и 3, тогда вы можете использовать client.replicate:

left = ... # multi-partition dataframe
right = ... # single-partition dataframe
right = right.persist()  # make sure it exists in one worker
client.replicate(right)  # replicate it across many workers

... proceed as normal

Это не будет быстрее, но шаги 1,2 будут перенесены в шаг репликации.

В вашем примере это выглядит так right имеет два раздела. Вы можете изменить это на один. Dask использует другой путь кода, который по сути map_partitions, в этом случае.

Другие вопросы по тегам