Gevent / запросы зависает при выполнении большого количества запросов головы
Мне нужно сделать 100 000 запросов на головку, и я использую gevent поверх запросов. Мой код работает некоторое время, но затем в конечном итоге зависает. Я не уверен, почему он висит, или он висит внутри запросов или gevent. Я использую аргумент timeout внутри обоих запросов и gevent.
Пожалуйста, посмотрите на мой фрагмент кода ниже, и дайте мне знать, что я должен изменить.
import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests
def get_head(url, timeout=3):
try:
return requests.head(url, allow_redirects=True, timeout=timeout)
except:
return None
def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
p = pool.Pool(chunk_size)
print 'Expanding %d short_urls' % len(short_urls)
results = {}
for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
gevent.joinall(jobs, timeout=timeout)
results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
return results
Я пробовал греквесты, но он был заброшен, и я прошел через запросы на получение github, но у них всех тоже есть проблемы.
2 ответа
Использование ОЗУ, которое вы наблюдаете, в основном зависит от всех данных, которые накапливаются при хранении 100000 объектов ответа, и от всех накладных расходов. Я воспроизвел вашу заявку и отправил HEAD-запросы против 15000 URL-адресов из топ-рейтинга Alexa. Это не имеет большого значения
- использовал ли я пул gevent (т. е. один гринлет на соединение) или фиксированный набор гринлетов, причем все запрашивали несколько URL-адресов
- насколько большой я установил размер пула
В конце концов, использование оперативной памяти значительно возросло. Тем не менее, я заметил, что изменение от requests
в urllib2
уже привело к сокращению использования оперативной памяти, примерно в два раза. То есть я заменил
result = requests.head(url)
с
request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)
Несколько других советов: не используйте два механизма тайм-аута. Тайм-аут Gevent очень надежен, и вы можете легко использовать его следующим образом:
def gethead(url):
result = None
try:
with Timeout(5, False):
result = requests.head(url)
except Exception as e:
result = e
return result
Может выглядеть сложно, но либо возвращается None
(по истечении почти 5 секунд и указывает на тайм-аут), любой объект исключения, представляющий ошибку связи или ответ. Работает отлично!
Хотя это, вероятно, не является частью проблемы, в таких случаях я рекомендую поддерживать рабочих в живых и позволять им работать над несколькими предметами каждый! Накладные расходы на нерестовые гринлеты невелики. Тем не менее, это будет очень простое решение с набором долгоживущих гринлетов:
def qworker(qin, qout):
while True:
try:
qout.put(gethead(qin.get(block=False)))
except Empty:
break
qin = Queue()
qout = Queue()
for url in urls:
qin.put(url)
workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]
Кроме того, вы действительно должны понимать, что это крупномасштабная проблема, которую вы решаете там, приводя к нестандартным проблемам. Когда я воспроизвел ваш сценарий с тайм-аутом в 20 с и 100 рабочих и 15000 URL-адресов, которые нужно запросить, я легко получил большое количество сокетов:
# netstat -tpn | wc -l
10074
То есть ОС имела более 10000 сокетов для управления, большинство из них в состоянии TIME_WAIT. Я также наблюдал ошибки "Too many open files" и настраивал пределы с помощью sysctl. Когда вы запрашиваете 100000 URL-адресов, вы, вероятно, также достигнете таких пределов, и вам нужно будет принять меры, чтобы предотвратить голодание системы.
Также обратите внимание на то, как вы используете запросы, он автоматически выполняет перенаправления с HTTP на HTTPS и автоматически проверяет сертификат, который, безусловно, стоит оперативной памяти.
В моих измерениях, когда я делил количество запрошенных URL-адресов на время выполнения программы, я почти никогда не пропускал 100 ответов / с, что является результатом высокоскоростных соединений с иностранными серверами по всему миру. Я думаю, что на вас также влияет такой предел. Отрегулируйте остальную часть архитектуры до этого предела, и вы, вероятно, сможете генерировать поток данных из Интернета на диск (или базу данных) с небольшим использованием оперативной памяти между ними.
Я должен ответить на два ваших главных вопроса, а именно:
Я думаю, что gevent/ то, как вы используете это не ваша проблема. Я думаю, вы просто недооцениваете сложность своей задачи. Это сопровождается неприятными проблемами и доводит вашу систему до предела.
проблема использования оперативной памяти: начните с использования
urllib2
, если ты можешь. Затем, если вещи накапливаются все еще слишком высоко, вам нужно работать против накопления. Попробуйте создать устойчивое состояние: вы можете начать записывать данные на диск и, как правило, работать в направлении ситуации, когда объекты могут стать сборщиками мусора.Ваш код "в конечном итоге зависает": возможно, это связано с проблемой ОЗУ. Если это не так, то не порождайте так много гринлетов, а используйте их по назначению. Кроме того, еще больше уменьшите параллелизм, контролируйте количество открытых сокетов, увеличьте системные ограничения при необходимости, и попытайтесь выяснить, где именно висит ваше программное обеспечение.
Я не уверен, что это решит вашу проблему, но вы не используете pool.Pool() правильно.
Попробуй это:
def expand_short_urls(short_urls, chunk_size=100):
# Pool() automatically limits your process to chunk_size greenlets running concurrently
# thus you don't need to do all that chunking business you were doing in your for loop
p = pool.Pool(chunk_size)
print 'Expanding %d short_urls' % len(short_urls)
# spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
# NOT the value your function, get_head, will return
threads = [p.spawn(get_head, short_url) for short_url in short_urls]
p.join()
# to access the returned value of your function, access the Greenlet.value property
results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads)
если thread.value не None и thread.value.status_code == 200} возвращает результаты