Как перебирать кадры данных pandas строка за строкой с 10M строками
Я работаю над веб-службами Python Flask, и это требование похоже... Приложение будет получать один файл в формате csv или xlsx. Я прочитал этот файл и преобразовал его в фреймворк Pandas. Теперь мне нужно перебрать каждую строку фрейма данных и проверить конкретное условие. если условие выполняется, необходимо обновить несколько столбцов в одном фрейме данных.
Я сделал это, используя приведенный ниже код, но меня не устраивает производительность...
def ExecuteInParallel(convertContract,ratesDf,inputDf):
for index, row in inputDf.iterrows():
currencyFound = ratesDf.query('CCY1 =="{0}" and CCY2 == "{1}"'.format(row[convertContract.INPUT_CURRENCY]
,row[convertContract.RETURN_CURRENCY]))
if(len(currencyFound.index) == 0):
raise BadRequest("Given Currency combination not found with provided date.")
currentrate = currencyFound.Rate.values[0]
if(convertContract.ROUNDING != None and convertContract.ROUNDING != ""):
rounding = int(convertContract.ROUNDING)
if(rounding > 0):
convertedamount = round(float(row[convertContract.INPUT_AMOUNT]) * currentrate,int(convertContract.ROUNDING))
inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
else:
convertedamount = float(row[convertContract.INPUT_AMOUNT]) * currentrate
inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
if(convertContract.RETURN_RATE == "True"):
inputDf.at[index,convertContract.RETURN_VALUE + "_FX Rate"] = currentrate
Я провел некоторый анализ производительности и пришел к выводу, что перебор 10 тыс. Строк занимает около 470 секунд.
Я хочу выполнить это для 10 миллионов строк. Поэтому я попробовал программировать потоки в Python, сохранив вышеуказанный вызов функции, но с меньшими фреймами данных. Я создал патроны из 500 строк данных и перешел к указанному выше методу, это собственный патрон, но не было замечено ни одной секунды.
Кто-нибудь может мне с этим помочь.
def ConvertdataFramesValues(self,contract,ratesDf,inputDf):
try:
treadList = []
size = 500
list_of_dfs = list(inputDf.loc[i:i + size - 1,:] for i in range(0, len(inputDf),size))
for frame in list_of_dfs:
t1 = threading.Thread(target=ExecuteInParallel,args=(convertContract,ratesDf,frame))
treadList.append(t1)
t1.start()
for t in treadList:
t.join()
inputDf = pd.concat(list_of_dfs)
print(list_of_dfs[0].head())
return inputDf
except Exception as e:
msg = "unable to convert data frame values. " + str(e)
print(msg)
raise BadRequest(msg)