Есть ли способ убить нить?
Можно ли завершить работающий поток без установки / проверки каких-либо флагов / семафоров / и т. Д.?
31 ответ
Как правило, плохой шаблон - внезапное завершение потока в Python и на любом языке. Подумайте о следующих случаях:
- поток содержит критический ресурс, который должен быть правильно закрыт
- поток создал несколько других потоков, которые также должны быть уничтожены.
Хороший способ справиться с этим, если вы можете себе это позволить (если вы управляете своими собственными потоками), - это иметь флаг exit_request, который каждый поток проверяет через регулярные промежутки времени, чтобы узнать, пора ли ему выйти.
Например:
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
В этом коде вы должны вызывать stop() для потока, когда вы хотите, чтобы он завершился, и ждать, пока поток завершит работу корректно, используя join(). Поток должен регулярно проверять флаг остановки.
Однако бывают случаи, когда вам действительно нужно убить поток. Например, когда вы оборачиваете внешнюю библиотеку, которая занята для длительных вызовов, и хотите прервать ее.
Следующий код позволяет (с некоторыми ограничениями) вызвать исключение в потоке Python:
def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):
'''A thread class that supports raising exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id
CAREFUL : this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
# TODO: in python 2.6, there's a simpler way to do : self.ident
raise AssertionError("could not determine the thread's id")
def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,
one way to ensure that it works is:
t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )
If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.
CAREFUL : this function is executed in the context of the
caller thread, to raise an excpetion in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )
(По материалам Killable Threads Томера Филиба. Цитата о возвращаемом значении PyThreadState_SetAsyncExc
кажется, из старой версии Python.)
Как отмечено в документации, это не волшебная палочка, потому что, если поток занят вне интерпретатора Python, он не будет перехватывать прерывание.
Хорошая схема использования этого кода - заставить поток перехватить определенное исключение и выполнить очистку. Таким образом, вы можете прервать задачу и при этом выполнить надлежащую очистку.
multiprocessing.Process
Можно p.terminate()
В тех случаях, когда я хочу уничтожить поток, но не хочу использовать флаги / блокировки / сигналы / семафоры / события / что угодно, я продвигаю потоки в полномасштабные процессы. Для кода, который использует только несколько потоков, издержки не так уж и плохи.
Например, это удобно, чтобы легко завершать вспомогательные "потоки", которые выполняют блокирующий ввод / вывод.
Преобразование тривиально: в связанном коде заменить все threading.Thread
с multiprocessing.Process
и все queue.Queue
с multiprocessing.Queue
и добавить необходимые вызовы p.terminate()
к вашему родительскому процессу, который хочет убить своего ребенка p
Нет официального API для этого, нет.
Вам нужно использовать API платформы для уничтожения потока, например, pthread_kill или TerminateThread. Вы можете получить доступ к такому API, например, через pythonwin или через ctypes.
Обратите внимание, что это небезопасно. Скорее всего, это приведет к необратимому мусору (из локальных переменных стековых фреймов, которые становятся мусором) и может привести к взаимным блокировкам, если уничтожаемый поток имеет GIL в момент, когда он завершается.
Если вы пытаетесь завершить всю программу, вы можете установить поток как "демон". см. Thread.daemon
Как уже упоминалось, нормой является установка флага остановки. Для чего-то более легкого (без подклассов Thread, без глобальной переменной), лямбда-обратный вызов является опцией. (Обратите внимание на круглые скобки в if stop()
.)
import threading
import time
def do_work(id, stop):
print("I am thread", id)
while True:
print("I am thread {} doing something".format(id))
if stop():
print(" Exiting loop.")
break
print("Thread {}, signing off".format(id))
def main():
stop_threads = False
workers = []
for id in range(0,3):
tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
workers.append(tmp)
tmp.start()
time.sleep(3)
print('main: done sleeping; time to stop the threads.')
stop_threads = True
for worker in workers:
worker.join()
print('Finis.')
if __name__ == '__main__':
main()
Замена print()
с pr()
функция, которая всегда сбрасывает (sys.stdout.flush()
) может повысить точность вывода оболочки.
(Проверено только на Windows/Eclipse/Python3.3)
В Python вы просто не можете уничтожить поток напрямую.
Если вам НЕ действительно нужен поток (!), То вместо использования пакетапотоков вы можете использовать многопроцессорный пакет. Здесь, чтобы убить процесс, вы можете просто вызвать метод:
yourProcess.terminate() # kill the process!
Python убьет ваш процесс (в Unix через сигнал SIGTERM, а в Windows через TerminateProcess()
вызов). Обратите внимание, чтобы использовать его при использовании очереди или трубы! (это может повредить данные в очереди / канале)
Обратите внимание, что multiprocessing.Event
и multiprocessing.Semaphore
работать точно так же, как threading.Event
и threading.Semaphore
соответственно. Фактически, первые - клоны последних.
Если вам ДЕЙСТВИТЕЛЬНО нужно использовать поток, нет способа уничтожить его напрямую. Однако вы можете использовать "поток демона". На самом деле, в Python поток может быть помечен как демон:
yourThread.daemon = True # set the Thread as a "daemon thread"
Основная программа завершит работу, когда не останется живых потоков, не являющихся демонами. Другими словами, когда ваш основной поток (который, конечно, не является потоком демона) завершит свои операции, программа завершит работу, даже если все еще работают некоторые потоки демона.
Обратите внимание, что необходимо установить поток как daemon
перед start()
метод называется!
Конечно, вы можете и должны использовать daemon
даже с multiprocessing
, Здесь, когда основной процесс завершается, он пытается завершить все свои демонические дочерние процессы.
Наконец, обратите внимание, что sys.exit()
а также os.kill()
это не выбор.
Это основано на thread2 - убиваемые темы (рецепт Python)
Вам нужно вызвать PyThreadState_SetasyncExc(), который доступен только через ctypes.
Это было протестировано только на Python 2.7.3, но, вероятно, будет работать с другими недавними выпусками 2.x.
import ctypes
def terminate_thread(thread):
"""Terminates a python thread from another thread.
:param thread: a threading.Thread instance
"""
if not thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
Вы никогда не должны принудительно убивать поток, не сотрудничая с ним.
Уничтожение потока снимает любые гарантии того, что блоки try / finally настроены так, что вы можете оставить блокировки заблокированными, открытыми файлами и т. Д.
Единственный раз, когда вы можете утверждать, что принудительное уничтожение потоков - это хорошая идея, это быстрое уничтожение программы, но не отдельных потоков.
Если вы явно звоните time.sleep()
как часть вашей цепочки (скажем, опрос некоторых внешних служб), усовершенствованием метода Филиппа является использование тайм-аута в event
"s wait()
метод везде, где вы sleep()
Например:
import threading
class KillableThread(threading.Thread):
def __init__(self, sleep_interval=1):
super().__init__()
self._kill = threading.Event()
self._interval = sleep_interval
def run(self):
while True:
print("Do Something")
# If no kill signal is set, sleep for the interval,
# If kill signal comes in while sleeping, immediately
# wake up and handle
is_killed = self._kill.wait(self._interval)
if is_killed:
break
print("Killing Thread")
def kill(self):
self._kill.set()
Затем запустить его
t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread
Преимущество использования wait()
вместо sleep()
Если вы регулярно проверяете событие, вы можете запрограммировать его на более длительные интервалы сна, поток останавливается почти сразу (в противном случае sleep()
ing) и, на мой взгляд, код для обработки выхода значительно проще.
Вы можете убить поток, установив трассировку в поток, который выйдет из потока. Смотрите прикрепленную ссылку для одной возможной реализации.
Определенно возможно реализовать Thread.stop
метод, как показано в следующем примере кода:
import sys
import threading
import time
class StopThread(StopIteration):
pass
threading.SystemExit = SystemExit, StopThread
class Thread2(threading.Thread):
def stop(self):
self.__stop = True
def _bootstrap(self):
if threading._trace_hook is not None:
raise ValueError('Cannot run thread with tracing!')
self.__stop = False
sys.settrace(self.__trace)
super()._bootstrap()
def __trace(self, frame, event, arg):
if self.__stop:
raise StopThread()
return self.__trace
class Thread3(threading.Thread):
def _bootstrap(self, stop_thread=False):
def stop():
nonlocal stop_thread
stop_thread = True
self.stop = stop
def tracer(*_):
if stop_thread:
raise StopThread()
return tracer
sys.settrace(tracer)
super()._bootstrap()
###############################################################################
def main():
test1 = Thread2(target=printer)
test1.start()
time.sleep(1)
test1.stop()
test1.join()
test2 = Thread2(target=speed_test)
test2.start()
time.sleep(1)
test2.stop()
test2.join()
test3 = Thread3(target=speed_test)
test3.start()
time.sleep(1)
test3.stop()
test3.join()
def printer():
while True:
print(time.time() % 1)
time.sleep(0.1)
def speed_test(count=0):
try:
while True:
count += 1
except StopThread:
print('Count =', count)
if __name__ == '__main__':
main()
Thread3
Похоже, что класс выполняет код примерно на 33% быстрее, чем Thread2
учебный класс.
Лучше, если вы не убьете нить. Можно было бы ввести блок try в цикл потока и вызвать исключение, когда вы хотите остановить поток (например, break / return /..., который останавливает ваш for /while/...). Я использовал это в своем приложении, и это работает...
Я опаздываю к этой игре, но я боролся с подобным вопросом, и следующее, кажется, как решило проблему для меня идеально, так и позволило мне выполнить некоторую базовую проверку и очистку состояния потока при выходе из демонизированного подпотока:
import threading
import time
import atexit
def do_work():
i = 0
@atexit.register
def goodbye():
print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
(i, threading.currentThread().ident))
while True:
print i
i += 1
time.sleep(1)
t = threading.Thread(target=do_work)
t.daemon = True
t.start()
def after_timeout():
print "KILL MAIN THREAD: %s" % threading.currentThread().ident
raise SystemExit
threading.Timer(2, after_timeout).start()
Урожайность:
0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]
Для обхода потока можно использовать следующий обходной путь:
kill_threads = False
def doSomething():
global kill_threads
while True:
if kill_threads:
thread.exit()
......
......
thread.start_new_thread(doSomething, ())
Это может быть использовано даже для завершения потоков, чей код написан в другом модуле, из основного потока. Мы можем объявить глобальную переменную в этом модуле и использовать ее для завершения потоков, порожденных в этом модуле.
Я обычно использую это для завершения всех потоков при выходе из программы. Это может быть не идеальный способ завершить поток (ы), но может помочь.
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))
т твой Thread
объект.
Прочитайте исходник Python (Modules/threadmodule.c
а также Python/thread_pthread.h
) вы можете увидеть Thread.ident
является pthread_t
типа, так что вы можете сделать что-нибудь pthread
можно сделать в Python использовать libpthread
,
Вот еще один способ сделать это, но с чрезвычайно чистым и простым кодом, который будет работать в Python 3.7 в 2021 году:
import ctypes
def kill_thread(thread):
"""
thread: a threading.Thread object
"""
thread_id = thread.ident
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print('Exception raise failure')
Адаптировано отсюда: https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
Одна вещь, которую я хочу добавить, это то, что если вы читаете официальную документацию в потоке lib Python, рекомендуется избегать использования "демонических" потоков, когда вы не хотите, чтобы потоки заканчивались внезапно, с флагом, упомянутым Паоло Ровелли.
Из официальной документации:
Потоки демона внезапно останавливаются при завершении работы. Их ресурсы (такие как открытые файлы, транзакции базы данных и т. Д.) Могут быть освобождены неправильно. Если вы хотите, чтобы ваши потоки корректно останавливались, сделайте их недемоническими и используйте подходящий механизм сигнализации, такой как Event.
Я думаю, что создание демонических потоков зависит от вашего приложения, но в целом (и на мой взгляд) лучше не убивать их или делать их демоническими. В многопроцессорной обработке вы можете использовать is_alive()
проверить состояние процесса и завершить их (также вы избежите проблем с GIL). Но иногда вы можете найти больше проблем, когда выполняете свой код в Windows.
И всегда помните, что если у вас есть "живые потоки", интерпретатор Python будет их ждать. (Из-за этого демон может помочь вам, если не имеет значения, внезапно заканчивается).
Для этого есть библиотека, стопит. Хотя некоторые из тех же предостережений, перечисленных здесь, все еще применяются, по крайней мере, эта библиотека представляет регулярный, повторяемый метод для достижения поставленной цели.
Версия Python: 3.8
Используя поток демона для выполнения того, что мы хотели, если мы хотим, чтобы поток демона был завершен, все, что нам нужно, это выполнить выход из родительского потока, тогда система завершит поток демона, созданный родительским потоком.
Также поддерживает сопрограммы и функции сопрограмм.
def main():
start_time = time.perf_counter()
t1 = ExitThread(time.sleep, (10,), debug=False)
t1.start()
time.sleep(0.5)
t1.exit()
try:
print(t1.result_future.result())
except concurrent.futures.CancelledError:
pass
end_time = time.perf_counter()
print(f"time cost {end_time - start_time:0.2f}")
ниже исходный код ExitThread
import concurrent.futures
import threading
import typing
import asyncio
class _WorkItem(object):
""" concurrent\futures\thread.py
"""
def __init__(self, future, fn, args, kwargs, *, debug=None):
self._debug = debug
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if self._debug:
print("ExitThread._WorkItem run")
if not self.future.set_running_or_notify_cancel():
return
try:
coroutine = None
if asyncio.iscoroutinefunction(self.fn):
coroutine = self.fn(*self.args, **self.kwargs)
elif asyncio.iscoroutine(self.fn):
coroutine = self.fn
if coroutine is None:
result = self.fn(*self.args, **self.kwargs)
else:
result = asyncio.run(coroutine)
if self._debug:
print("_WorkItem done")
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
class ExitThread:
""" Like a stoppable thread
Using coroutine for target then exit before running may cause RuntimeWarning.
"""
def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
, args=(), kwargs={}, *, daemon=None, debug=None):
#
self._debug = debug
self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
, daemon=daemon)
self._child_daemon_thread = None
self.result_future = concurrent.futures.Future()
self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
self._parent_thread_exit_lock = threading.Lock()
self._parent_thread_exit_lock.acquire()
self._parent_thread_exit_lock_released = False # When done it will be True
self._started = False
self._exited = False
self.result_future.add_done_callback(self._release_parent_thread_exit_lock)
def _parent_thread_run(self):
self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
, name="ExitThread_child_daemon_thread"
, daemon=True)
self._child_daemon_thread.start()
# Block manager thread
self._parent_thread_exit_lock.acquire()
self._parent_thread_exit_lock.release()
if self._debug:
print("ExitThread._parent_thread_run exit")
def _release_parent_thread_exit_lock(self, _future):
if self._debug:
print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
if not self._parent_thread_exit_lock_released:
self._parent_thread_exit_lock_released = True
self._parent_thread_exit_lock.release()
def _child_daemon_thread_run(self):
self._workItem.run()
def start(self):
if self._debug:
print(f"ExitThread.start {self._started}")
if not self._started:
self._started = True
self._parent_thread.start()
def exit(self):
if self._debug:
print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
if self._parent_thread_exit_lock_released:
return
if not self._exited:
self._exited = True
if not self.result_future.cancel():
if self.result_future.running():
self.result_future.set_exception(concurrent.futures.CancelledError())
Предполагая, что вы хотите иметь несколько потоков одной и той же функции, это ИМХО самая простая реализация, чтобы остановить один по идентификатору:
import time
from threading import Thread
def doit(id=0):
doit.stop=0
print("start id:%d"%id)
while 1:
time.sleep(1)
print(".")
if doit.stop==id:
doit.stop=0
break
print("end thread %d"%id)
t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))
t5.start() ; t6.start()
time.sleep(2)
doit.stop =5 #kill t5
time.sleep(2)
doit.stop =6 #kill t6
Здесь хорошо то, что у вас может быть несколько одинаковых и разных функций, и остановить их все с помощью functionname.stop
Если вы хотите иметь только один поток функции, вам не нужно запоминать идентификатор. Просто остановись, еслиdoit.stop
> 0.
Просто чтобы развить идею @SCB (которая была именно тем, что мне нужно) создать подкласс KillableThread с настраиваемой функцией:
from threading import Thread, Event
class KillableThread(Thread):
def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
super().__init__(None, target, name, args, kwargs)
self._kill = Event()
self._interval = sleep_interval
print(self._target)
def run(self):
while True:
# Call custom function with arguments
self._target(*self._args)
# If no kill signal is set, sleep for the interval,
# If kill signal comes in while sleeping, immediately
# wake up and handle
is_killed = self._kill.wait(self._interval)
if is_killed:
break
print("Killing Thread")
def kill(self):
self._kill.set()
if __name__ == '__main__':
def print_msg(msg):
print(msg)
t = KillableThread(10, print_msg, args=("hello world"))
t.start()
time.sleep(6)
print("About to kill thread")
t.kill()
Естественно, как и в случае с @SBC, поток не дожидается запуска нового цикла до остановки. В этом примере вы увидите сообщение "Killing Thread", напечатанное сразу после сообщения "About to kill thread", вместо того, чтобы ждать еще 4 секунды для завершения потока (поскольку мы уже спали 6 секунд).
Второй аргумент в конструкторе KillableThread - это ваша пользовательская функция (здесь print_msg). Аргумент args - это аргументы, которые будут использоваться здесь при вызове функции (("hello world")).
Хотя он довольно старый, для некоторых это может быть удобным решением:
Небольшой модуль, который расширяет функциональность модуля потока - позволяет одному потоку вызывать исключения в контексте другого потока. Поднимая
SystemExit
Вы можете, наконец, убить потоки Python.
import threading
import ctypes
def _async_raise(tid, excobj):
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
class Thread(threading.Thread):
def raise_exc(self, excobj):
assert self.isAlive(), "thread must be started"
for tid, tobj in threading._active.items():
if tobj is self:
_async_raise(tid, excobj)
return
# the thread was alive when we entered the loop, but was not found
# in the dict, hence it must have been already terminated. should we raise
# an exception here? silently ignore?
def terminate(self):
# must raise the SystemExit type, instead of a SystemExit() instance
# due to a bug in PyThreadState_SetAsyncExc
self.raise_exc(SystemExit)
Таким образом, он позволяет "потоку вызывать исключения в контексте другого потока", и, таким образом, завершенный поток может обрабатывать завершение без регулярной проверки флага прерывания.
Однако, согласно исходному коду, есть некоторые проблемы с этим кодом.
- Исключение будет возбуждено только при выполнении байт-кода Python. Если ваш поток вызывает встроенную / встроенную функцию блокировки, исключение будет вызвано только тогда, когда выполнение вернется к коду Python.
- Существует также проблема, если встроенная функция внутренне вызывает PyErr_Clear(), что эффективно отменит ожидающее исключение. Вы можете попытаться поднять его снова.
- Только типы исключений могут быть сгенерированы безопасно. Экземпляры исключений могут вызывать неожиданное поведение и поэтому ограничены.
- Например: t1.raise_exc(TypeError), а не t1.raise_exc(TypeError("бла")).
- ИМХО, это ошибка, и я сообщил об этом как об одном. Для получения дополнительной информации, http://mail.python.org/pipermail/python-dev/2006-August/068158.html
- Я попросил выставить эту функцию во встроенном модуле потока, но так как ctypes стал стандартной библиотекой (начиная с 2.5), и это
функция, скорее всего, не зависит от реализации, ее можно сохранить
неразоблаченный.
Альтернативой является использованиеsignal.pthread_kill
послать стоп-сигнал.
from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep
def target():
for num in count():
print(num)
sleep(1)
thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)
результат
0
1
2
3
4
[14]+ Stopped
Это плохой ответ, смотрите комментарии
Вот как это сделать:
from threading import *
...
for thread in enumerate():
if thread.isAlive():
try:
thread._Thread__stop()
except:
print(str(thread.getName()) + ' could not be terminated'))
Дайте ему несколько секунд, после чего ваш поток должен быть остановлен. Проверьте также thread._Thread__delete()
метод.
Я бы порекомендовал thread.quit()
метод для удобства. Например, если у вас есть сокет в вашей ветке, я бы рекомендовал создать quit()
метод в вашем классе дескриптора сокета, завершите сокет, затем запустите thread._Thread__stop()
внутри вашего quit()
,
Как уже упоминалось в @Kozyarchuk в ответ, установка трассировки работает. Поскольку этот ответ не содержал кода, вот рабочий, готовый к использованию пример:
import sys, threading, time
class TraceThread(threading.Thread):
def __init__(self, *args, **keywords):
threading.Thread.__init__(self, *args, **keywords)
self.killed = False
def start(self):
self._run = self.run
self.run = self.settrace_and_run
threading.Thread.start(self)
def settrace_and_run(self):
sys.settrace(self.globaltrace)
self._run()
def globaltrace(self, frame, event, arg):
return self.localtrace if event == 'call' else None
def localtrace(self, frame, event, arg):
if self.killed and event == 'line':
raise SystemExit()
return self.localtrace
def f():
while True:
print('1')
time.sleep(2)
print('2')
time.sleep(2)
print('3')
time.sleep(2)
t = TraceThread(target=f)
t.start()
time.sleep(2.5)
t.killed = True
Он останавливается после того, как напечатал 1
а также 2
. 3
не печатается.
Питер Хинтдженс, один из основателей проекта ØMQ, говорит, что использование ØMQ и избегание примитивов синхронизации, таких как блокировки, мьютексы, события и т. Д., Является самым безопасным и безопасным способом написания многопоточных программ:
http://zguide.zeromq.org/py:all
Это включает в себя сообщение дочернему потоку, что он должен отменить свою работу. Это можно сделать, оснастив поток ØMQ-сокетом и опросив на нем сокет, чтобы получить сообщение о том, что он должен быть отменен.
Ссылка также предоставляет пример многопоточного кода Python с ØMQ.
Запустите подпоток с помощью setDaemon(True).
def bootstrap(_filename):
mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.
t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)
while True:
t.start()
time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
print('Thread stopped')
break
Самый простой способ таков:
from threading import Thread
from time import sleep
def do_something():
global thread_work
while thread_work:
print('doing something')
sleep(5)
print('Thread stopped')
thread_work = True
Thread(target=do_something).start()
sleep(5)
thread_work = False
Вы можете выполнить свою команду в процессе, а затем уничтожить ее, используя идентификатор процесса. Мне нужно было синхронизировать два потока, один из которых не возвращается сам по себе.
processIds = []
def executeRecord(command):
print(command)
process = subprocess.Popen(command, stdout=subprocess.PIPE)
processIds.append(process.pid)
print(processIds[0])
#Command that doesn't return by itself
process.stdout.read().decode("utf-8")
return;
def recordThread(command, timeOut):
thread = Thread(target=executeRecord, args=(command,))
thread.start()
thread.join(timeOut)
os.kill(processIds.pop(), signal.SIGINT)
return;
Это похоже на работу с pywin32 на Windows 7
my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()