Python socket.gethostbyname_ex() многопоточный сбой
Я запрограммировал скрипт, который должен разрешать несколько имен хостов в IP-адреса, используя многопоточность.
Однако, это терпит неудачу и зависает в некоторой случайной точке. Как это можно решить?
num_threads = 100
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database')
cur = conn.cursor()
def mexec(befehl):
cur = conn.cursor()
cur.execute(befehl)
websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array
queue = Queue()
def getips(i, q):
while True:
#--resolve IP--
try:
result = socket.gethostbyname_ex(site)
print(result)
mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb
except (socket.gaierror):
print("no ip")
mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
q.task_done()
#Spawn thread pool
for i in range(num_threads):
worker = Thread(target=getips, args=(i, queue))
worker.setDaemon(True)
worker.start()
#Place work in queue
for site in websites:
queue.put(site)
#Wait until worker threads are done to exit
queue.join()
3 ответа
Вы можете использовать значение Sentinel, чтобы сигнализировать потокам, что нет работы, и присоединяться к потокам вместо queue.task_done()
а также queue.join()
:
#!/usr/bin/env python
import socket
from Queue import Queue
from threading import Thread
def getips(queue):
for site in iter(queue.get, None):
try: # resolve hostname
result = socket.gethostbyname_ex(site)
except IOError, e:
print("error %s reason: %s" % (site, e))
else:
print("done %s %s" % (site, result))
def main():
websites = "youtube google non-existent.example facebook yahoo live".split()
websites = [name+'.com' for name in websites]
# Spawn thread pool
queue = Queue()
threads = [Thread(target=getips, args=(queue,)) for _ in range(20)]
for t in threads:
t.daemon = True
t.start()
# Place work in queue
for site in websites: queue.put(site)
# Put sentinel to signal the end
for _ in threads: queue.put(None)
# Wait for completion
for t in threads: t.join()
main()
gethostbyname_ex()
функция устарела. Для поддержки обоих адресов IPv4/v6 вы можете использовать socket.getaddrinfo()
вместо.
Моя первая идея заключалась в том, что вы получаете ошибки из-за перегрузки в DNS - возможно, ваш распознаватель просто не позволяет вам делать больше, чем определенное количество запросов за раз.
Кроме того, я заметил некоторые проблемы:
Вы забыли назначить
site
правильно вwhile
цикл - который, вероятно, лучше заменитьfor
цикл итерации по очереди, или что-то. В вашей версии вы используетеsite
переменная из пространства имен уровня модуля, которая может привести к двойным запросам и пропущенным другим.В этом месте вы можете контролировать, есть ли в очереди записи или они еще ожидаются. Если обоих нет, вы можете выйти из своей ветки.
По соображениям безопасности, вам лучше сделать
def mexec(befehl, args=None): cur = conn.cursor() cur.execute(befehl, args)
для того, чтобы сделать потом
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
Чтобы оставаться совместимым с будущими протоколами, вы должны использовать socket.getaddrinfo()
вместо socket.gethostbyname_ex(site)
, Там вы получаете все IP-адреса, которые хотите (сначала вы можете ограничиться IPv4, но переключиться на IPv6 проще), и, возможно, можете поместить их все в БД.
Для вашей очереди примеры кода могут быть
def queue_iterator(q):
"""Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""
while True:
try:
item = q.get(block=q.is_filling, timeout=.1)
yield item
q.task_done() # indicate that task is done.
except Empty:
# If q is still filling, continue.
# If q is empty and not filling any longer, return.
if not q.is_filling: return
def getips(i, q):
for site in queue_iterator(q):
#--resolve IP--
try:
result = socket.gethostbyname_ex(site)
print(result)
mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
except (socket.gaierror):
print("no ip")
mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True
#Spawn thread pool
for i in range(num_threads):
worker = Thread(target=getips, args=(i, queue))
worker.setDaemon(True)
worker.start()
#Place work in queue
for site in websites:
queue.put(site)
queue.is_filling = False # we are done filling, if q becomes empty, we are done.
#Wait until worker threads are done to exit
queue.join()
должен сделать свое дело.
Другая проблема - ваша параллельная вставка в MySQL. Вам разрешено делать только один запрос MySQL за раз. Таким образом, вы можете защитить доступ через threading.Lock()
или же RLock()
или вы можете поместить ответы в другую очередь, которая обрабатывается другим потоком, который может даже связать их.
Вы можете найти его проще в использовании concurrent.futures
чем threading
, multiprocessing
, Queue
непосредственно:
#!/usr/bin/env python3
import socket
# pip install futures on Python 2.x
from concurrent.futures import ThreadPoolExecutor as Executor
hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100
with Executor(max_workers=20) as pool:
for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60):
print(results)
Примечание: вы можете легко перейти от использования потоков к процессам:
from concurrent.futures import ProcessPoolExecutor as Executor
Вам это нужно, если gethostbyname_ex()
не является потокобезопасным в вашей ОС, например, это может быть в OSX.
Если вы хотите обрабатывать исключения, которые могут возникнуть в gethostbyname_ex()
:
import concurrent.futures
with Executor(max_workers=20) as pool:
future2host = dict((pool.submit(socket.gethostbyname_ex, h), h)
for h in hosts)
for f in concurrent.futures.as_completed(future2host, timeout=60):
e = f.exception()
print(f.result() if e is None else "{0}: {1}".format(future2host[f], e))
Это похоже на пример из документации.