Python: как отправлять пакеты в многопоточность, а затем поток убивает себя
У меня вопрос. Я хотел бы посылать непрерывные потоки байтов на некоторый хост на определенное количество времени (скажем, 1 минута), используя python.
Вот мой код до сих пор:
#! /usr/bin/env python
import socket
import thread
import time
IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing
def send_data():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((IP, PORT))
count = 1
starttime = time.clock()
while elapsed < test_time:
sent = s.send(DATA + str(count) + "\n")
if sent == 0: break # assume that if nothing is sent -> connection died
count = count+1
elapsed = time.clock() - starttime
if killed:
break
s.close()
print str(count) + " has been sent"
print "to quit type quit"
thread.start_new_thread(send_data, ())
while True:
var = raw_input("Enter something: ")
if var == "quit":
killed = True
Несколько вопросов, есть ли лучший способ позволить потоку умереть через 60 секунд, кроме опроса time.clock каждый раз? Когда я запускаю эту программу, она отправляет байты правильно, но когда я набрал quit, другой поток не умрет, даже если я установил var kill = True. Интересно, почему это? сфера действия var Killed должна доходить до другой темы, верно?
Спасибо
7 ответов
Я рекомендовал использовать модуль потоков. Еще большим преимуществом является использование InterruptableThread для завершения потока. Вам не нужно использовать флаг для завершения вашего потока, но исключение произойдет, если вы вызовите terminate() в этом потоке из parent. Вы можете обрабатывать исключения или нет.
import threading, ctypes
class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
def raise_exc(self, excobj):
assert self.isAlive(), "thread must be started"
for tid, tobj in threading._active.items():
if tobj is self:
self._async_raise(tid, excobj)
return
def terminate(self):
self.raise_exc(SystemExit)
РЕДАКТИРОВАТЬ: Вы можете переписать свой код, как это, используя другой поток, который ждет 1 минуту, а затем убивает другой поток
def send_data:
IP = ...
# other vars
...
s = socket.socket(.....)
# no killed checking
# no time checking
# just do your work here
...
s.close()
my_thread = InterruptableThread(target=send_data)
my_thread.start()
def one_minute_kill(who):
time.sleep(60)
who.terminate()
killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()
print "to quit type quit"
while my_thread.isAlive():
if raw_input("Enter something: ") == "quit":
my_thread.terminate()
Я не знаю, как это сделать с модулем "поток", но я могу сделать это с помощью модуля "поток". Я думаю, что этот код выполняет то, что вы хотите.
Для документации по модулю потоков: http://docs.python.org/library/threading.html
#!/usr/bin/python
import time
from threading import Thread
import threading
import sys
test_time = 10
killed = False
class SillyThread( threading.Thread ):
def run(self):
global killed
starttime = time.time()
counter = 0
while (time.time() - starttime) < test_time:
if killed:
break
counter = counter + 1
time.sleep(0.1)
print "I did %d loops" % counter
class ManageThread( threading.Thread ):
def run(self):
global killed
while True:
var = raw_input("Enter something: ")
if var == "quit":
killed = True
break
print "Got var [%s]" % var
silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)
Обратите внимание, что я использую time.time() вместо time.clock(). time.clock() показывает истекшее время процессора в Unix (см. http://docs.python.org/library/time.html). Я думаю, time.clock () должен работать везде. Я установил свой test_time на 10 секунд, потому что у меня нет терпения ни на минуту.
Вот что произойдет, если я позволю ему запустить все 10 секунд:
leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye
Вот что произойдет, если я наберу 'quit':
leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye
Надеюсь это поможет.
Как уже упоминалось выше, используйте модуль потоков, он намного проще в использовании и предоставляет несколько примитивов синхронизации. Он также предоставляет класс Timer, который запускается через указанное количество времени.
Если вы просто хотите, чтобы программа выходила, вы можете просто сделать отправляющий поток демоном. Вы делаете это, вызывая setDaemon(True) перед вызовом start() (вместо 2.6 может использоваться атрибут daemon). Python не выйдет, пока работает поток, не являющийся демоном.
Вы можете сделать это довольно легко без потоков. Например, используя Twisted, вы просто настраиваете синхронизированный вызов и продюсера:
from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor
class Noisy(Protocol):
def __init__(self, delay, data):
self.delay = delay
self.data = data
def stop(self):
self.transport.unregisterProducer()
self.transport.loseConnection()
reactor.stop()
def resumeProducing(self):
self.transport.write(self.data)
def connectionMade(self):
self.transport.registerProducer(self, False)
reactor.callLater(self.delay, self.stop)
factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()
Это имеет различные преимущества перед многопоточным подходом. Он не полагается на потоки демона, поэтому вы можете на самом деле очистить сетевое соединение (например, чтобы отправить закрытое сообщение при необходимости), а не полагаться на платформу для его уничтожения. Он обрабатывает весь фактический сетевой код низкого уровня для вас (ваш исходный пример делает неправильно в случае socket.send, возвращающего 0; этот код будет обрабатывать этот случай должным образом). Вам также не нужно полагаться на ctypes или непонятный API-интерфейс CPython для вызова исключения в другом потоке (так что он переносим на большее количество версий Python и может фактически прервать заблокированную передачу, в отличие от некоторых других предлагаемых подходов).
Убедитесь, что "выход" работает правильно, и добавьте мелкий шрифт, чтобы проверить, работает ли ввод.
if var == "quit":
print "Hey we got quit"
Это легко проверить объем killed
:
>>> import thread
>>> killed = False
>>> import time
>>> def test():
... while True:
... time.sleep(1)
... if killed:
... print 'Dead.'
... break
...
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.
Прошедшая переменная не инициализируется. Установите его на ноль выше цикла while.