Dask Scheduler Memory

Наш процесс планировщика dask кажется всплывающим в памяти с течением времени и продолжением выполнения. В настоящее время мы видим, что используется 5 ГБ памяти, что кажется высоким, поскольку все данные предположительно живут на рабочих узлах:

  PID   USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
  31172 atoz      20   0 5486944 5.071g   7100 S 23.8 65.0  92:38.64 dask-scheduler

при запуске планировщика мы будем использовать менее 1 ГБ памяти. Перезапуск сети с помощью client.restart(), похоже, не помогает, только уничтожение самого процесса планировщика и перезапуск освобождают память.

Каково ожидаемое использование памяти для выполнения одной задачи? Действительно ли планировщик поддерживает только указатели на то, какой работник содержит результаты будущего?

----редактировать----

Я думаю, что моя главная проблема в том, почему client.restart() не освобождает память, используемую процессом планировщика. Я, очевидно, не ожидаю, что он освободит всю память, но вернется к базовому уровню. Мы используем client.map для выполнения нашей функции в списке различных входных данных. После выполнения, многократного перезапуска клиента и создания снимков памяти нашего планировщика, мы видим следующий рост: PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 27955 atoz 20 0 670556 507212 13536 R 43.7 6.2 1:23.61 dask-scheduler 27955 atoz 20 0 827308 663772 13536 S 1.7 8.1 16:25.85 dask-scheduler 27955 atoz 20 0 859652 696408 13536 S 4.0 8.5 19:18.04 dask-scheduler 27955 atoz 20 0 1087160 923912 13536 R 62.3 11.3 20:03.15 dask-scheduler 27955 atoz 20 0 1038904 875788 13536 S 3.7 10.7 23:57.07 dask-scheduler 27955 atoz 20 0 1441060 1.163g 12976 S 4.3 14.9 35:54.45 dask-scheduler 27955 atoz 20 0 1646204 1.358g 12976 S 4.3 17.4 37:05.86 dask-scheduler 27955 atoz 20 0 1597652 1.312g 12976 S 4.7 16.8 37:40.13 dask-scheduler

Я думаю, я был просто удивлен, что после выполнения client.restart() мы не видим, как использование памяти возвращается к некоторому базовому уровню.

---- дальнейшие правки ---- Еще немного информации о том, что у нас работает, так как было предложено, чтобы мы передавали большие структуры данных, чтобы отправить их непосредственно работникам.

мы посылаем словарь в качестве входных данных для каждой задачи, когда json выводит dict, большинство из них меньше 1000 символов.

---- еще больше правок: Воспроизведенная проблема ---- Мы воспроизвели эту проблему снова сегодня. Я убил планировщик и перезапустил его, у нас было около 5,4 ГБ свободной памяти, затем мы запустили функцию, которую я вставлю ниже, в 69614 объектов словаря, которые действительно содержат некоторую информацию, основанную на файлах (все наши работники отображаются на одно и то же NFS DataStore и мы используем Dask в качестве распределенной системы анализа файлов.

Вот функция (примечание: squarewheels4 - это самодельный ленивый пакет для извлечения и анализа файлов, он использует Acora и libarchive в качестве своей базы для извлечения файлов из сжатого архива и индексации файла.)

def get_mrc_failures(file_dict):
    from squarewheels4.platforms.ucs.b_series import ChassisTechSupport
    from squarewheels4.files.ucs.managed.chassis import CIMCTechSupportFile
    import re

    dimm_info_re = re.compile(r"(?P<slot>[^\|]+)\|(?P<size>\d+)\|.*\|(?P<pid>\S+)")
    return_dict = file_dict
    return_dict["return_code"] = "NOT_FILLED_OUT"
    filename = "{file_path}{file_sha1}/{file_name}".format(**file_dict)

    try:
        sw = ChassisTechSupport(filename)
    except Exception as e:
        return_dict["return_code"] = "SW_LOAD_ERROR"
        return_dict["error_msg"] = str(e)
        return return_dict

    server_dict = {}

    cimcs = sw.getlist("CIMC*.tar.gz")
    if not cimcs:
        return_dict["return_code"] = "NO_CIMCS"
        return_dict["keys_list"] = str(sw.getlist("*"))
        return return_dict

    for cimc in cimcs:
        if not isinstance(cimc, CIMCTechSupportFile): continue
        cimc_id = cimc.number
        server_dict[cimc_id] = {}

        # Get MRC file
        try:
            mrc = cimc["*MrcOut.txt"]
        except KeyError:
            server_dict[cimc_id]["response_code"] = "NO_MRC"
            continue
        # see if our end of file marker is there, should look like:
        # --- END OF FILE (Done!
        whole_mrc = mrc.read().splitlines()
        last_10 = whole_mrc[-10:]

        eof_line = [l for l in last_10 if b"END OF FILE" in l]
        server_dict[cimc_id]["response_code"] = "EOF_FOUND" if eof_line else "EOF_MISSING"

        if eof_line:
            continue

        # get DIMM types
        hit_inventory_line = False
        dimm_info = []
        dimm_error_lines = []
        equals_count = 0
        for line in whole_mrc:
            # regex each line... sigh
            if b"DIMM Inventory" in line:
                hit_inventory_line = True

            if not hit_inventory_line:
                continue

            if hit_inventory_line and b"=========" in line:
                equals_count += 1
                if equals_count > 2:
                    break
                continue

            if equals_count < 2:
                continue

            # we're in the dimm section and not out of it yet
            line = str(line)
            reg = dimm_info_re.match(line)
            if not reg:
                #bad :/
                dimm_error_lines.append(line)
                continue
            dimm_info.append(reg.groupdict())

        server_dict[cimc_id]["dimm_info"] = dimm_info
        server_dict[cimc_id]["dimm_error_lines"] = dimm_error_lines

    return_dict["return_code"] = "COMPLETED"
    return_dict["server_dict"] = server_dict
    return return_dict

`` `

фьючерсы генерируются как:

futures = client.map(function_name, file_list)

После того, как в этом состоянии моя цель состояла в том, чтобы попытаться восстановиться и сделать так, чтобы dask освободил выделенную память, вот мои усилия: до отмены фьючерсов:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6257840 4.883g   2324 S  0.0 62.6 121:21.93 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.1G        248M        9.9M        415M        383M
Swap:          8.0G        4.3G        3.7G

при отмене фьючерсов:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6258864 5.261g   5144 R 60.0 67.5 122:16.38 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        176M        9.4M        126M         83M
Swap:          8.0G        4.1G        3.9G

после отмены фьючерса:

PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6243760 5.217g   4920 S  0.0 66.9 123:13.80 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        186M        9.4M        132M         96M
Swap:          8.0G        4.1G        3.9G

после выполнения client.restart()

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6177424 5.228g   4912 S  2.7 67.1 123:20.04 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        196M        9.4M        136M        107M
Swap:          8.0G        4.0G        4.0G

Независимо от того, что я пробежал по распределенной системе, я ожидал, что после отмены фьючерса он вернется, по крайней мере, близко к нормальному... и после выполнения client.restart() мы определенно приблизимся к нашей нормальной базовой линии. Я здесь не прав?

--- second repro ---- Воспроизведено поведение (хотя и не полное исчерпание памяти) с помощью этих шагов:

Вот моя рабочая функция

def get_fault_list_v2(file_dict):
    import libarchive
    return_dict = file_dict
    filename = "{file_path}{file_sha1}/{file_name}".format(**file_dict)
    with libarchive.file_reader(filename) as arc:
        for e in arc:
            pn = e.pathname
    return return_dict

Я провел это через 68617 итераций / файлов

перед запуском мы увидели, как много памяти используется: PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ КОМАНДА 12256 atoz 20 0 1345848 1.107g 7972 S 1.7 14.2 47:15.24 dask-планировщик

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        3.1G        162M         22M        4.5G        4.3G
Swap:          8.0G        3.8G        4.2G

После пробежки мы увидели это много:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
12256 atoz      20   0 2461004 2.133g   8024 S  1.3 27.4  66:41.46 dask-scheduler

После выполнения client.restart мы увидели:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
12256 atoz      20   0 2462756 2.134g   8144 S  6.6 27.4  66:42.61 dask-scheduler

1 ответ

Обычно задача должна занимать менее килобайта в планировщике. Есть несколько вещей, которые могут привести к тому, что вы сохраните значительно больше данных, наиболее распространенным из которых является включение данных в график задач, который показан ниже.

Данные, включенные непосредственно в график задач, хранятся в планировщике. Это обычно происходит при использовании больших данных непосредственно в вызовах, таких как submit:

Плохой

x = np.random.random(1000000)  # some large array
future = client.submit(np.add, 1, x)  # x gets sent along with the task

Хорошо

x = np.random.random(1000000)  # some large array
x = client.scatter(x)  # scatter data explicitly to worker, get future back
future = client.submit(np.add, 1, x)  # only send along the future

Этот же принцип существует и при использовании других API. Для получения дополнительной информации я рекомендую предоставить mcve. В противном случае довольно сложно помочь.

Другие вопросы по тегам