Многопроцессорная обработка Python для безопасной записи в файл

Я пытаюсь решить большую числовую проблему, которая включает в себя множество подзадач, и я использую многопроцессорный модуль Python (в частности, Pool.map) для разделения разных независимых подзадач на разные ядра. Каждая подзадача включает в себя вычисление множества подзадач, и я пытаюсь эффективно запоминать эти результаты, сохраняя их в файле, если они еще не были вычислены каким-либо процессом, в противном случае пропустите вычисление и просто прочитайте результаты из файла.

У меня проблемы с параллелизмом файлов: разные процессы иногда проверяют, была ли вычислена подзадача (ищет файл, в котором будут сохранены результаты), видят, что это не так, запускаем вычисление, затем попробуйте записать результаты в один и тот же файл одновременно. Как мне избежать подобных коллизий?

5 ответов

Решение

@GP89 упомянул хорошее решение. Используйте очередь для отправки задач записи выделенному процессу, у которого есть единственный доступ для записи в файл. Все остальные работники имеют доступ только для чтения. Это устранит столкновения. Вот пример, который использует apply_async, но он также будет работать с картой:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in xrange(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    f = open(fn, 'wb') 
    while 1:
        m = q.get()
        if m == 'kill':
            f.write('killed')
            break
        f.write(str(m) + '\n')
        f.flush()
    f.close()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()

if __name__ == "__main__":
   main()

Мне кажется, что вам нужно использовать Менеджер, чтобы временно сохранить результаты в список, а затем записать результаты из списка в файл. Кроме того, используйте starmap для передачи объекта, который вы хотите обработать, и управляемого списка. Первым шагом является создание параметра для передачи в starmap, который включает в себя управляемый список.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd```

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

С этого момента вам нужно решить, как вы собираетесь обрабатывать список. Если у вас есть тонны оперативной памяти и огромный набор данных, не стесняйтесь объединять, используя панд. Затем вы можете очень легко сохранить файл в формате CSV или маринад.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

В ответ на комментарии о том, что это выполняется в кластере, простой вариант, который не зависит от межпроцессного взаимодействия, — заблокировать файл мемоизации с помощью fcntl из стандартной библиотеки Python.

Это работает на MacOS, и я ожидаю, что оно будет работать на большинстве систем Unix, хотя его необходимо будет протестировать на вашей конкретной реализации сетевого хранилища:

Safe.py

      import fcntl
import time

def myprint(*args):
    print(time.ctime(), *args)


def main():
    with open("safe.txt", "r+") as file:

        myprint("locking")

        # this will block (unless LOCK_EX | LOCK_NB is used)
        fcntl.lockf(file, fcntl.LOCK_EX)

        lines = file.readlines()

        # make race conditions more likely
        time.sleep(1)
        
        # "1" or one more than the the previous entry
        newval = int(lines[-1])+1 if lines else 1

        print(newval)

        file.write(str(newval) + "\n")
        file.flush()

        myprint("unlocking")

        fcntl.lockf(file, fcntl.F_UNLCK)


if __name__ == '__main__':
    main()

Вы можете проверить, работает ли он локально, запустив это в терминале:

      touch safe.txt  # this needs to already exist

for x in 1 2 3 4 5
do
  python safe.py &
done

cat safe.txt  # should have 1-5 inside

Если вы объедините это с многопроцессорностью, каждому процессу, вероятно, понадобится свой собственный файловый дескриптор (поэтому запуститеopen()отдельно в каждом процессе).

Я подумал, что опубликую свое решение и для несколько более простой проблемы, поскольку всякий раз, когда я ищу свою проблему, появляется эта страница.

Я в некоторой степени основывал это на решении @MikeHunter, приведенном выше . Причина, по которой мне нужно что-то немного другое, заключается в том, что массивы, которые я хочу записать в конце каждого процесса, довольно велики, что означает, что помещение их в очередь, получение их из очереди и запись их с использованием другого процесса требуют большого количества травления. и рассолка чрезвычайно больших массивов. Это не решает проблему проверки многих подзадач и подзадач, как того требует ОП, но обрабатывает «название» вопроса!

Так что мне делать?

Я анализирую блокировку, к которой имеют доступ все процессы, и записываю файл внутриLock.acquire()иLock.release()обертка. Таким образом, ни один из процессов не может писать, когда любой другой. Все это для последовательной записи в файлы HDF5 без необходимости компиляции MPI.

      
from multiprocessing import Process, Queue, Lock
import h5py
import numpy as np
from time import sleep, time


def func(i, l, filename, subfilename):

    # Reading from the subfile
    with h5py.File(subfilename, 'r') as ds:
        array = ds['array'][:]

    sleeptime = np.random.rand(1)*4 + 1
    sleep(sleeptime[0])

    # Print array loaded to compare to output in the summary file
    print(i, f'{sleeptime[0]:.3f}', array)

    # Lock out any other process from writing to the summary file
    l.acquire()

    with h5py.File(filename, 'r+') as ds:
        ds['array'][i, :] = array

    # Release the lock
    l.release()


if __name__ == '__main__':

    N = 10
    Nsample = 5

    subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]

    for i in range(N):
        with h5py.File(subfilenames[i], 'w') as ds:
            disp = ds.create_dataset(
                'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')

    filename = 'test.h5'

    with h5py.File(filename, 'w') as ds:
        disp = ds.create_dataset('array', (N, Nsample), dtype='f')

    # Create a lock that is communicated to the workers
    l = Lock()

    # Start the timer
    t0 = time()

    # Distribute the work to the workers
    processes = []

    print(" T  sleeptime     array", flush=True)
    print("-----------------------", flush=True)

    for i in range(N):
        p = Process(target=func, args=(
            i, l, filename, subfilenames[i]))
        p.start()
        processes.append(p)

    # Wait for the workers to finish
    for p in processes:
        p.join()

    # Print time taken
    print(f'Total time taken: {time()-t0:.2f} s')


Если вы сохраните скрипт какhello.pyвы можете запустить и отсортировать вывод следующим образом:

      python hello.py | sort

Что должно генерировать что-то вроде этого:

       T  sleeptime     array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s

Проверьте записанный файл HDF5:

      h5dump test.h5

что должно привести к чему-то вроде этого

      HDF5 "test.h5" {
GROUP "/" {
   DATASET "array" {
      DATATYPE  H5T_IEEE_F32LE
      DATASPACE  SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
      DATA {
      (0,0): 4, 1, 1, 0, 2,
      (1,0): 2, 1, 1, 1, 3,
      (2,0): 1, 2, 2, 4, 3,
      (3,0): 1, 4, 4, 3, 0,
      (4,0): 4, 4, 4, 4, 1,
      (5,0): 1, 3, 1, 0, 4,
      (6,0): 4, 1, 0, 3, 0,
      (7,0): 3, 4, 4, 0, 4,
      (8,0): 4, 3, 3, 1, 4,
      (9,0): 3, 3, 3, 4, 0
      }
   }
}
}

Обратите внимание на обновление

Сначала я использовал очередь для своего варианта использования, но понял, что простойmultiprocessing.Lockсделал бы свое дело. Нет необходимости в сложномQueue.put Queue.getсворачивать.


Обратите внимание, что есть более эффективные способы сделать это с помощью mpi4py, но мне нужно, чтобы пользователь не беспокоился о MPI.

После долгих поисков в Google вот что я нашел. В этом примере я анализирую CSV-файл, содержащий 20 миллионов строк, запускаю Apply для каждого фрагмента и затем сохраняю выходные данные обратно в файл:

      def process_chunk(df, i, lock):
    chunk_data = []
    try: 
        df = df.fillna('')
        df.apply(lambda row: "0" if row['items'] == "[]" else process(row, chunk_data), axis=1)
        apply_data = pd.DataFrame(chunk_data)
        df = pd.merge(df, election_data, left_index=True, right_index=True)
        
        with lock:
            print('writing to file')
            df.to_csv('output_test.csv', header=(i==0), mode='a')
    except Exception as e: 
        print(f'error here: {e}')

    return 'DONE'
    

if __name__ == "__main__":
    chunks = pd.read_csv('really_big_text_file.txt', header=None, low_memory=False, dtype=str, on_bad_lines='skip', encoding='latin-1', chunksize=100000)

    pool = ProcessPoolExecutor(max_workers=6)
    m = multiprocessing.Manager()
    lock = m.Lock()
    futures = [pool.submit(process_chunk, chunk, i, lock) for i, chunk in enumerate(chunks)]
    for future in as_completed(futures):
        print(future.result())
Другие вопросы по тегам