Параллельно работает для цикла через Python
У меня есть процесс, который перебирает список IP-адресов и возвращает некоторую информацию о них. Простой цикл for отлично работает, моя проблема - запустить его в масштабе из-за блокировки глобального интерпретатора Python (GIL).
Моя цель - запустить эту функцию параллельно и в полной мере использовать мои 4 ядра. Таким образом, когда я запускаю 100K из них, мне не понадобится 24 часа через обычный цикл for.
После прочтения ответов здесь, в частности, на этот вопрос, Как я могу распараллелить простой цикл Python? Я решил использовать joblib. Когда я запускаю 10 записей через него (пример выше), это заняло более 10 минут. Это не похоже, что это работает правильно. Я знаю, что я что-то делаю неправильно или не понимаю. Любая помощь очень ценится!
import pandas as pd
import numpy as np
import os as os
from ipwhois import IPWhois
from joblib import Parallel, delayed
import multiprocessing
num_core = multiprocessing.cpu_count()
iplookup = ['174.192.22.197',\
'70.197.71.201',\
'174.195.146.248',\
'70.197.15.130',\
'174.208.14.133',\
'174.238.132.139',\
'174.204.16.10',\
'104.132.11.82',\
'24.1.202.86',\
'216.4.58.18']
Нормальный цикл, который работает нормально!
asn=[]
asnid=[]
asncountry=[]
asndesc=[]
asnemail = []
asnaddress = []
asncity = []
asnstate = []
asnzip = []
asndesc2 = []
ipaddr=[]
b=1
totstolookup=len(iplookup)
for i in iplookup:
i = str(i)
print("Running #{} out of {}".format(b,totstolookup))
try:
obj=IPWhois(i,timeout=15)
result=obj.lookup_whois()
asn.append(result['asn'])
asnid.append(result['asn_cidr'])
asncountry.append(result['asn_country_code'])
asndesc.append(result['asn_description'])
try:
asnemail.append(result['nets'][0]['emails'])
asnaddress.append(result['nets'][0]['address'])
asncity.append(result['nets'][0]['city'])
asnstate.append(result['nets'][0]['state'])
asnzip.append(result['nets'][0]['postal_code'])
asndesc2.append(result['nets'][0]['description'])
ipaddr.append(i)
except:
asnemail.append(0)
asnaddress.append(0)
asncity.append(0)
asnstate.append(0)
asnzip.append(0)
asndesc2.append(0)
ipaddr.append(i)
except:
pass
b+=1
Функция, чтобы перейти к JobLib для запуска на всех ядрах!
def run_ip_process(iplookuparray):
asn=[]
asnid=[]
asncountry=[]
asndesc=[]
asnemail = []
asnaddress = []
asncity = []
asnstate = []
asnzip = []
asndesc2 = []
ipaddr=[]
b=1
totstolookup=len(iplookuparray)
for i in iplookuparray:
i = str(i)
print("Running #{} out of {}".format(b,totstolookup))
try:
obj=IPWhois(i,timeout=15)
result=obj.lookup_whois()
asn.append(result['asn'])
asnid.append(result['asn_cidr'])
asncountry.append(result['asn_country_code'])
asndesc.append(result['asn_description'])
try:
asnemail.append(result['nets'][0]['emails'])
asnaddress.append(result['nets'][0]['address'])
asncity.append(result['nets'][0]['city'])
asnstate.append(result['nets'][0]['state'])
asnzip.append(result['nets'][0]['postal_code'])
asndesc2.append(result['nets'][0]['description'])
ipaddr.append(i)
except:
asnemail.append(0)
asnaddress.append(0)
asncity.append(0)
asnstate.append(0)
asnzip.append(0)
asndesc2.append(0)
ipaddr.append(i)
except:
pass
b+=1
ipdataframe = pd.DataFrame({'ipaddress':ipaddr,
'asn': asn,
'asnid':asnid,
'asncountry':asncountry,
'asndesc': asndesc,
'emailcontact': asnemail,
'address':asnaddress,
'city':asncity,
'state': asnstate,
'zip': asnzip,
'ipdescrip':asndesc2})
return ipdataframe
запустить процесс, используя все ядра через joblib
Parallel(n_jobs=num_core)(delayed(run_ip_process)(iplookuparray) for i in iplookup)