Завершить нить USB-детектора, используя монитор от pyudev
У меня есть скрипт Python, который я запускаю на удаленном устройстве. Будут созданы две разные темы. Первый поток создается для мониторинга USB-подключений к устройству.
class USBDetector(threading.Thread):
''' Monitor udev for detection of usb '''
def run(self):
''' Runs the actual loop to detect the events '''
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='usb')
self.monitor.start()
for device in iter(self.monitor.poll, None):
if device.action == 'add':
# some action to run on insertion of usb
Я пытался вставить оператор break, если состояние глобальной переменной изменяется. Но это не сработало. что-то простое, как
if TERMINATE == True:
break
Я посмотрел на https://pyudev.readthedocs.io/en/latest/api/pyudev.html и через чтение это выглядит как этот раздел кода
for device in iter(self.monitor.poll, None):
if device.action == 'add':
# some function to run on insertion of usb
является бесконечным циклом, если вместо None не вставлено время ожидания. Я хочу убить поток, когда другой поток заканчивается. Если я дам команду quit для моего основного потока, этот usbdetector просто продолжит работать. Любые предложения о том, как это остановить?
(ОБНОВИТЬ)
Привет,
извините, я пошел с низким техническим способом решения моей проблемы на данный момент.
Если кто-нибудь знает, как выйти из этого цикла без необходимости второго цикла, дайте мне знать
def run(self):
''' Runs the actual loop to detect the events '''
global terminate
self.rmmod_Module()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='usb')
self.monitor.start()
count = 0
while not terminate:
count = count + 1
print count
for device in iter(partial(self.monitor.poll, 3), None):
if device.action == 'add':
# some function to run on insertion of usb
очевидно, что у меня есть цикл for, вложенный в цикл while, ожидающий завершения истины. Это просто и работает, однако все же хотелось бы знать, есть ли способ вывести устройство из цикла в цикл iter().
1 ответ
Это может быть не тот прямой ответ, который вы ищете. Вместо синхронного мониторинга портов USB с помощью опроса, почему бы не использовать асинхронные обратные вызовы, как показано в следующем руководстве по устройству мониторинга pyudev в разделе Асинхронный мониторинг¶.
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by('block')
def log_event(action, device):
if 'ID_FS_TYPE' in device:
with open('filesystems.log', 'a+') as stream:
print('{0} - {1}'.format(action, device.get('ID_FS_LABEL')), file=stream)
observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()
Из фрагмента кода вы можете получить несколько обратных вызовов для одного действия USB-устройства, поскольку оно может распознавать одно USB-устройство как несколько устройств. Собрав все это вместе, вы могли бы сделать что-то вроде следующего.
class USBDetector():
''' Monitor udev for detection of usb '''
def run(self):
''' Runs the actual loop to detect the events '''
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='usb')
self.observer = pyudev.MonitorObserver(self.monitor, self.usbDeviceEventHandler)
self.observer.start()
def usbDeviceEventHandler(self, action, device):
if device.action == 'add':
# some function to run on insertion of usb
Вы можете получить несколько обратных вызовов для одного действия usb, поэтому вы можете реализовать блокировку потоков с помощью Thread.lock(), а также получать доступ и редактировать переменную времени и принимать только новые обратные вызовы каждую секунду. Надеюсь, это поможет, извините за поздний ответ.