Как записать содержимое df в CSV-файл, используя многопроцессорность в Python

У меня есть функция, которая записывает содержимое df в CSV-файл.

def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    headers = []
    fid = open(defFile, 'r')
    for line in fid:
        headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
    df = pd.DataFrame([], columns=headers)
    for header in outDf.columns.values:
        if header in headers:
            df[header] = outDf[header]

    df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')

Как я могу распараллелить этот процесс? В настоящее время я использую следующий код

def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
    logInfo('Start writingtoSchema in parallel...', 'track')
    headers = []
    fid = open(defFile, 'r')
    for line in fid:
        headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
    df = pd.DataFrame([], columns=headers)
    for header in outDf.columns.values:
        if header in headers:
            df[header] = outDf[header]
    out_Names = Queue()
    cores = min([int(multiprocessing.cpu_count() / 2), int(len(outDf) / 200000)+1])
    #cores=4
    logInfo(str(cores) + 'cores are used...', 'track')
    # split the data for parallel computation
    outDf = splitDf(df, cores)
    # process the chunks in parallel
    logInfo('splitdf called are df divided...', 'track')
    Filenames=[]
    procs = []
    fname=toFile.split("_Opera_output")[0]
    for i in range(0, cores):
        filename=fname+"_"+str(i)+".tsv"
        proc = Process(target=writeToSchema, args=(outDf[i], defFile,filename, retainFlag,delim, quotechar,i))
        procs.append(proc)
        proc.start()
        print 'processing '+str(i)
        Filenames.append(filename)
# combine all returned chunks
#   outDf = out_Names.get()
#   for i in range(1, cores):
#       outDf = outDf.append(out_q.get(), ignore_index=True)
    for proc in procs:
        proc.join()
    logInfo('Now we merge files...', 'track')
    print Filenames
    with open(toFile,'w') as outfile:
        for fname in Filenames:
            with open(fname) as infile:
                 for line in infile:
                     outfile.write(line)

Но это не сработало и выдает следующую ошибку

2017-12-17 16:02:55,078 - track - ERROR: Traceback (most recent call last):
2017-12-17 16:02:55,078 - track - ERROR:   File "C:/Users/sudhir.tiwari/Document
s/AMEX2/Workspace/Backup/Trunk/code/runMapping.py", line 257, in <module>
2017-12-17 16:02:55,089 - track - ERROR: writeToSchemaParallel(outDf, defFile, t
oFile, retainFlag, delim='\t', quotechar='"')
2017-12-17 16:02:55,153 - track - ERROR:   File "C:\Users\sudhir.tiwari\Document
s\AMEX2\Workspace\Backup\Trunk\code\utils.py", line 510, in writeToSchemaParalle
l
2017-12-17 16:02:55,163 - track - ERROR: with open(fname) as infile:
2017-12-17 16:02:55,198 - track - ERROR: IOError
2017-12-17 16:02:55,233 - track - ERROR: :
2017-12-17 16:02:55,233 - track - ERROR: [Errno 2] No such file or directory: 'C
:/Users/sudhir.tiwari/Documents/AMEX2/Workspace/Input/work/Schindler_20171130/Sc
hindler_20171130_0.tsv'

И это не записывалось в файлы, так как, когда я ищу местоположение, файлы не найдены. Я использую многопроцессорную обработку для записи данных в несколько файлов, а затем объединяю все. Разделите df, разделите фрейм данных на n частей.

3 ответа

Использование многопроцессорной обработки потребует больше времени, чем использование по умолчанию (прямое сохранение). Используя Синхронизацию между процессами, используйте Процессы и Блокировку для параллельного процесса записи. Ниже приведен пример POC.

import pandas as pd
import numpy as np
from multiprocessing import Lock, Process
from time import time

def writefile(df,l):
    l.acquire()
    df.to_csv('dataframe-multiprocessing.csv', index=False, mode='a', header=False)
    l.release()


if __name__ == '__main__':
    a = np.random.randint(1,1000,10000000)
    b = np.random.randint(1,1000,10000000)
    c = np.random.randint(1,1000,10000000)

    df = pd.DataFrame(data={'a':a,'b':b,'c':c})

    print('Iterative way:')
    print()
    new = time()
    df.to_csv('dataframe-conventional.csv', index=False, mode='a', header=False)
    print(time() - new, 'seconds')

    print()    
    print('Multiprocessing way:')
    print()
    new = time()
    l = Lock()
    p = Process(target=writefile, args=(df,l))
    p.start()
    p.join()
    print(time() - new, 'seconds')
    print()

    df1 = pd.read_csv('dataframe-conventional.csv')
    df2 = pd.read_csv('dataframe-multiprocessing.csv')
    print('If both file same or not:')
    print(df1.equals(df2))

Результат:

C:\Users\Ariff\Documents\GitHub\testing-code>python pandas_multip.py
Iterative way:

18.323541402816772 seconds

Multiprocessing way:

20.14128303527832 seconds

If both file same or not:
True

Вы можете отслеживать использование процессора при использовании to_csv, пока ваш процессор простаивает, вы можете ускорить прогресс, используя multiprocess.
Простой код следующим образом:

      import numpy as np
import pandas as pd
from joblib import Parallel, delayed


def write_csv(df, filename):
    df.to_csv(filename)


df = pd.DataFrame({'c': ['a'*100]*100_000_000, })

N = 8
parts = np.array_split(df, N)

Parallel(n_jobs=N)(delayed(write_csv)(
    part, f'part_{i}') for i, part in enumerate(parts))

Это стоит 35sв моей машине.
Однако,

      df.to_csv

расходы 3min.

Если вы записываете файл на диск, нет смысла распараллеливать его. Из-за записи данных на диск не является параллельным по своей природе. И файл всегда будет записан на диск операционной системой. Таким образом, вы не можете добиться какой-либо производительности, написав программные коды высокого уровня.

Другие вопросы по тегам