Многопроцессорная обработка Python для безопасной записи в файл
Я пытаюсь решить большую числовую проблему, которая включает в себя множество подзадач, и я использую многопроцессорный модуль Python (в частности, Pool.map) для разделения разных независимых подзадач на разные ядра. Каждая подзадача включает в себя вычисление множества подзадач, и я пытаюсь эффективно запоминать эти результаты, сохраняя их в файле, если они еще не были вычислены каким-либо процессом, в противном случае пропустите вычисление и просто прочитайте результаты из файла.
У меня проблемы с параллелизмом файлов: разные процессы иногда проверяют, была ли вычислена подзадача (ищет файл, в котором будут сохранены результаты), видят, что это не так, запускаем вычисление, затем попробуйте записать результаты в один и тот же файл одновременно. Как мне избежать подобных коллизий?
5 ответов
@GP89 упомянул хорошее решение. Используйте очередь для отправки задач записи выделенному процессу, у которого есть единственный доступ для записи в файл. Все остальные работники имеют доступ только для чтения. Это устранит столкновения. Вот пример, который использует apply_async, но он также будет работать с картой:
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in xrange(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
f = open(fn, 'wb')
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
f.close()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
if __name__ == "__main__":
main()
Мне кажется, что вам нужно использовать Менеджер, чтобы временно сохранить результаты в список, а затем записать результаты из списка в файл. Кроме того, используйте starmap для передачи объекта, который вы хотите обработать, и управляемого списка. Первым шагом является создание параметра для передачи в starmap, который включает в себя управляемый список.
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd```
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
С этого момента вам нужно решить, как вы собираетесь обрабатывать список. Если у вас есть тонны оперативной памяти и огромный набор данных, не стесняйтесь объединять, используя панд. Затем вы можете очень легко сохранить файл в формате CSV или маринад.
df = pd.concat(row, ignore_index=True)
df.to_pickle('data.pickle')
df.to_csv('data.csv')
В ответ на комментарии о том, что это выполняется в кластере, простой вариант, который не зависит от межпроцессного взаимодействия, — заблокировать файл мемоизации с помощью fcntl из стандартной библиотеки Python.
Это работает на MacOS, и я ожидаю, что оно будет работать на большинстве систем Unix, хотя его необходимо будет протестировать на вашей конкретной реализации сетевого хранилища:
Safe.py
import fcntl
import time
def myprint(*args):
print(time.ctime(), *args)
def main():
with open("safe.txt", "r+") as file:
myprint("locking")
# this will block (unless LOCK_EX | LOCK_NB is used)
fcntl.lockf(file, fcntl.LOCK_EX)
lines = file.readlines()
# make race conditions more likely
time.sleep(1)
# "1" or one more than the the previous entry
newval = int(lines[-1])+1 if lines else 1
print(newval)
file.write(str(newval) + "\n")
file.flush()
myprint("unlocking")
fcntl.lockf(file, fcntl.F_UNLCK)
if __name__ == '__main__':
main()
Вы можете проверить, работает ли он локально, запустив это в терминале:
touch safe.txt # this needs to already exist
for x in 1 2 3 4 5
do
python safe.py &
done
cat safe.txt # should have 1-5 inside
Если вы объедините это с многопроцессорностью, каждому процессу, вероятно, понадобится свой собственный файловый дескриптор (поэтому запуститеopen()
отдельно в каждом процессе).
Я подумал, что опубликую свое решение и для несколько более простой проблемы, поскольку всякий раз, когда я ищу свою проблему, появляется эта страница.
Я в некоторой степени основывал это на решении @MikeHunter, приведенном выше . Причина, по которой мне нужно что-то немного другое, заключается в том, что массивы, которые я хочу записать в конце каждого процесса, довольно велики, что означает, что помещение их в очередь, получение их из очереди и запись их с использованием другого процесса требуют большого количества травления. и рассолка чрезвычайно больших массивов. Это не решает проблему проверки многих подзадач и подзадач, как того требует ОП, но обрабатывает «название» вопроса!
Так что мне делать?
Я анализирую блокировку, к которой имеют доступ все процессы, и записываю файл внутриLock.acquire()
иLock.release()
обертка. Таким образом, ни один из процессов не может писать, когда любой другой. Все это для последовательной записи в файлы HDF5 без необходимости компиляции MPI.
from multiprocessing import Process, Queue, Lock
import h5py
import numpy as np
from time import sleep, time
def func(i, l, filename, subfilename):
# Reading from the subfile
with h5py.File(subfilename, 'r') as ds:
array = ds['array'][:]
sleeptime = np.random.rand(1)*4 + 1
sleep(sleeptime[0])
# Print array loaded to compare to output in the summary file
print(i, f'{sleeptime[0]:.3f}', array)
# Lock out any other process from writing to the summary file
l.acquire()
with h5py.File(filename, 'r+') as ds:
ds['array'][i, :] = array
# Release the lock
l.release()
if __name__ == '__main__':
N = 10
Nsample = 5
subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]
for i in range(N):
with h5py.File(subfilenames[i], 'w') as ds:
disp = ds.create_dataset(
'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')
filename = 'test.h5'
with h5py.File(filename, 'w') as ds:
disp = ds.create_dataset('array', (N, Nsample), dtype='f')
# Create a lock that is communicated to the workers
l = Lock()
# Start the timer
t0 = time()
# Distribute the work to the workers
processes = []
print(" T sleeptime array", flush=True)
print("-----------------------", flush=True)
for i in range(N):
p = Process(target=func, args=(
i, l, filename, subfilenames[i]))
p.start()
processes.append(p)
# Wait for the workers to finish
for p in processes:
p.join()
# Print time taken
print(f'Total time taken: {time()-t0:.2f} s')
Если вы сохраните скрипт какhello.py
вы можете запустить и отсортировать вывод следующим образом:
python hello.py | sort
Что должно генерировать что-то вроде этого:
T sleeptime array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s
Проверьте записанный файл HDF5:
h5dump test.h5
что должно привести к чему-то вроде этого
HDF5 "test.h5" {
GROUP "/" {
DATASET "array" {
DATATYPE H5T_IEEE_F32LE
DATASPACE SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
DATA {
(0,0): 4, 1, 1, 0, 2,
(1,0): 2, 1, 1, 1, 3,
(2,0): 1, 2, 2, 4, 3,
(3,0): 1, 4, 4, 3, 0,
(4,0): 4, 4, 4, 4, 1,
(5,0): 1, 3, 1, 0, 4,
(6,0): 4, 1, 0, 3, 0,
(7,0): 3, 4, 4, 0, 4,
(8,0): 4, 3, 3, 1, 4,
(9,0): 3, 3, 3, 4, 0
}
}
}
}
Обратите внимание на обновление
Сначала я использовал очередь для своего варианта использования, но понял, что простойmultiprocessing.Lock
сделал бы свое дело. Нет необходимости в сложномQueue.put
Queue.get
сворачивать.
Обратите внимание, что есть более эффективные способы сделать это с помощью mpi4py, но мне нужно, чтобы пользователь не беспокоился о MPI.
После долгих поисков в Google вот что я нашел. В этом примере я анализирую CSV-файл, содержащий 20 миллионов строк, запускаю Apply для каждого фрагмента и затем сохраняю выходные данные обратно в файл:
def process_chunk(df, i, lock):
chunk_data = []
try:
df = df.fillna('')
df.apply(lambda row: "0" if row['items'] == "[]" else process(row, chunk_data), axis=1)
apply_data = pd.DataFrame(chunk_data)
df = pd.merge(df, election_data, left_index=True, right_index=True)
with lock:
print('writing to file')
df.to_csv('output_test.csv', header=(i==0), mode='a')
except Exception as e:
print(f'error here: {e}')
return 'DONE'
if __name__ == "__main__":
chunks = pd.read_csv('really_big_text_file.txt', header=None, low_memory=False, dtype=str, on_bad_lines='skip', encoding='latin-1', chunksize=100000)
pool = ProcessPoolExecutor(max_workers=6)
m = multiprocessing.Manager()
lock = m.Lock()
futures = [pool.submit(process_chunk, chunk, i, lock) for i, chunk in enumerate(chunks)]
for future in as_completed(futures):
print(future.result())