Система событий в Python
Какую систему событий для Python вы используете? Я уже знаю о pydispatcher, но мне было интересно, что еще можно найти, или обычно используется?
Меня не интересуют менеджеры событий, которые являются частью больших фреймворков, я бы предпочел использовать небольшое решение, которое я могу легко расширить.
16 ответов
Подводя итог различным системам событий, которые упомянуты в ответах здесь:
Самым основным стилем системы событий является "пакет методов обработки", который представляет собой простую реализацию шаблона Observer. По сути, методы-обработчики (callables) хранятся в массиве и каждый из них вызывается, когда событие "срабатывает".
- zope.event показывает, как это работает (см . ответ Леннарта). Примечание: этот пример даже не поддерживает аргументы обработчика.
- Реализация LongPoke 'callable list' показывает, что такая система событий может быть реализована очень минималистично с помощью подклассов
list
, - EventHook от spassig (шаблон событий Майкла Фурда) является простой реализацией.
- Класс Josip's Valued Lessons Event в основном такой же, но использует
set
вместоlist
хранить сумку и инвентарь__call__
которые являются разумными дополнениями. - PyNotify похожа по своей концепции, а также предоставляет дополнительные понятия о переменных и условиях ("событие изменения переменной").
- По сути, axel - это мешок с обработчиками, с большим количеством функций, связанных с многопоточностью, обработкой ошибок,...
Недостаток этих систем событий заключается в том, что вы можете зарегистрировать обработчики только для реального объекта Event (или списка обработчиков). Таким образом, во время регистрации событие уже должно существовать.
Вот почему существует второй стиль систем событий: шаблон публикации-подписки. Здесь обработчики регистрируются не в объекте события (или списке обработчиков), а в центральном диспетчере. Также уведомители общаются только с диспетчером. Что слушать или что публиковать, определяется "сигналом", который является не чем иным, как именем (строкой).
- Blinker имеет несколько отличных функций, таких как автоматическое отключение и фильтрация по отправителю.
- PyPubSub на первый взгляд кажется довольно простым.
- PyDispatcher, кажется, подчеркивает гибкость в отношении публикации "многие ко многим" и т. Д.
- louie - переработанный PyDispatcher, "обеспечивающий инфраструктуру плагинов, включая поддержку Twisted и PyQt". Похоже, что потерял обслуживание после января 2016 года.
- django.dispatch - это переписанный PyDispatcher "с более ограниченным интерфейсом, но более высокой производительностью".
- Сигналы и слоты Qt доступны в PyQt или PySide. Они работают как обратный вызов, когда используются в одном потоке, или как события (используя цикл событий) между двумя разными потоками. Сигналы и слоты имеют ограничение, что они работают только в объектах классов, которые являются производными от
QObject
,
Примечание: threading.Event не является "системой событий" в вышеприведенном смысле. Это система синхронизации потоков, в которой один поток ожидает, пока другой поток не "сигнализирует" объект Event.
Примечание: выше еще не включены pypydispatcher, python-dispatch, и может также представлять интерес "система ловушек" pluggy.
Я делал это так:
class Event(list):
"""Event subscription.
A list of callable objects. Calling an instance of this will cause a
call to each item in the list in ascending order by index.
Example Usage:
>>> def f(x):
... print 'f(%s)' % x
>>> def g(x):
... print 'g(%s)' % x
>>> e = Event()
>>> e()
>>> e.append(f)
>>> e(123)
f(123)
>>> e.remove(f)
>>> e()
>>> e += (f, g)
>>> e(10)
f(10)
g(10)
>>> del e[0]
>>> e(2)
g(2)
"""
def __call__(self, *args, **kwargs):
for f in self:
f(*args, **kwargs)
def __repr__(self):
return "Event(%s)" % list.__repr__(self)
Однако, как и со всем остальным, что я видел, для этого не существует автоматически сгенерированного pydoc и подписей, которые действительно отстой.
Мы используем EventHook, как это было предложено Майклом Фурдом в его модели событий:
Просто добавьте EventHooks в ваши классы с помощью:
class MyBroadcaster()
def __init__():
self.onChange = EventHook()
theBroadcaster = MyBroadcaster()
# add a listener to the event
theBroadcaster.onChange += myFunction
# remove listener from the event
theBroadcaster.onChange -= myFunction
# fire event
theBroadcaster.onChange.fire()
Мы добавили функциональность для удаления всех слушателей из объекта в класс Michaels и получили следующее:
class EventHook(object):
def __init__(self):
self.__handlers = []
def __iadd__(self, handler):
self.__handlers.append(handler)
return self
def __isub__(self, handler):
self.__handlers.remove(handler)
return self
def fire(self, *args, **keywargs):
for handler in self.__handlers:
handler(*args, **keywargs)
def clearObjectHandlers(self, inObject):
for theHandler in self.__handlers:
if theHandler.im_self == inObject:
self -= theHandler
Я использую zope.event. Это самые голые кости, которые вы можете себе представить.:-) На самом деле, вот полный исходный код:
subscribers = []
def notify(event):
for subscriber in subscribers:
subscriber(event)
Обратите внимание, что вы не можете отправлять сообщения между процессами, например. Это не система обмена сообщениями, просто система событий, ни больше, ни меньше.
Я нашел этот небольшой сценарий на Ценных уроках. Кажется, у меня просто правильное соотношение простоты и мощности. Питер Тэтчер является автором следующего кода (лицензирование не упоминается).
class Event:
def __init__(self):
self.handlers = set()
def handle(self, handler):
self.handlers.add(handler)
return self
def unhandle(self, handler):
try:
self.handlers.remove(handler)
except:
raise ValueError("Handler is not handling this event, so cannot unhandle it.")
return self
def fire(self, *args, **kargs):
for handler in self.handlers:
handler(*args, **kargs)
def getHandlerCount(self):
return len(self.handlers)
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = getHandlerCount
class MockFileWatcher:
def __init__(self):
self.fileChanged = Event()
def watchFiles(self):
source_path = "foo"
self.fileChanged(source_path)
def log_file_change(source_path):
print "%r changed." % (source_path,)
def log_file_change2(source_path):
print "%r changed!" % (source_path,)
watcher = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Вот минимальный дизайн, который должен работать нормально. Что вам нужно сделать, это просто наследовать Observer
в классе, а затем использовать observe(event_name, callback_fn)
слушать конкретное событие. Всякий раз, когда это конкретное событие происходит в любом месте кода (т.е. Event('USB connected')
), соответствующий обратный вызов сработает.
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
Пример:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
Вы можете взглянуть на pymitter ( pypi). Это небольшой однофайловый (~250 loc) подход, "обеспечивающий пространства имен, подстановочные знаки и TTL".
Вот основной пример:
from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg
# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Я создал EventManager
класс (код в конце). Синтаксис следующий:
#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )
#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )
#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )
#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun
#Delete an event
del EventManager.eventName
#Fire the event
EventManager.eventName()
Вот пример:
def hello(name):
print "Hello {}".format(name)
def greetings(name):
print "Greetings {}".format(name)
EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello
print "\nInitial salute"
EventManager.salute('Oscar')
print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')
Выход:
Первоначальный салют
Привет Оскар
Привет оскарСейчас снимаю поздравления
Привет оскар
Код EventManger:
class EventManager:
class Event:
def __init__(self,functions):
if type(functions) is not list:
raise ValueError("functions parameter has to be a list")
self.functions = functions
def __iadd__(self,func):
self.functions.append(func)
return self
def __isub__(self,func):
self.functions.remove(func)
return self
def __call__(self,*args,**kvargs):
for func in self.functions : func(*args,**kvargs)
@classmethod
def addEvent(cls,**kvargs):
"""
addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
creates events using **kvargs to create any number of events. Each event recieves a list of functions,
where every function in the list recieves the same parameters.
Example:
def hello(): print "Hello ",
def world(): print "World"
EventManager.addEvent( salute = [hello] )
EventManager.salute += world
EventManager.salute()
Output:
Hello World
"""
for key in kvargs.keys():
if type(kvargs[key]) is not list:
raise ValueError("value has to be a list")
else:
kvargs[key] = cls.Event(kvargs[key])
cls.__dict__.update(kvargs)
Я сделал вариант минималистичного подхода Longpoke, который также обеспечивает подписи как для вызывающих, так и для вызывающих абонентов:
class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}
>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)
>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()
Если я делаю код в pyQt, я использую парадигму сокетов / сигналов QT, то же самое относится и к django
Если я делаю асинхронный ввод-вывод, используйте родной модуль выбора
Если я использую синтаксический анализатор SAX Python, я использую API событий, предоставляемый SAX. Так что, похоже, я жертва базового API:-)
Возможно, вам следует спросить себя, что вы ожидаете от фреймворка / модуля событий. Лично я предпочитаю использовать парадигму Socket/Signal от QT. больше информации об этом можно найти здесь
Если вы хотите сделать более сложные вещи, такие как объединение событий или повторение, вы можете использовать шаблон Observable и зрелую библиотеку, которая реализует это. https://github.com/ReactiveX/RxPY. Observables очень распространены в Javascript и Java, и их очень удобно использовать для некоторых асинхронных задач.
from rx import Observable, Observer
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
ВЫХОД:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
Если вам нужна шина событий, которая работает через границы процессов или сетей, вы можете попробовать PyMQ. В настоящее время он поддерживает публикации / подписки, очереди сообщений и синхронный RPC. Версия по умолчанию работает поверх серверной части Redis, поэтому вам понадобится работающий сервер Redis. Также есть бэкэнд в памяти для тестирования. Вы также можете написать свой собственный бэкэнд.
import pymq
# common code
class MyEvent:
pass
# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
print('event received')
# publisher code
pymq.publish(MyEvent())
# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')
Для инициализации системы:
from pymq.provider.redis import RedisConfig
# starts a new thread with a Redis event loop
pymq.init(RedisConfig())
# main application control loop
pymq.shutdown()
Отказ от ответственности: я являюсь автором этой библиотеки
Вот еще один модуль для рассмотрения. Это кажется приемлемым выбором для более требовательных приложений.
Py-notify - это пакет Python, предоставляющий инструменты для реализации шаблона программирования Observer. Эти инструменты включают сигналы, условия и переменные.
Сигналы - это списки обработчиков, которые вызываются при излучении сигнала. Условия в основном являются булевыми переменными в сочетании с сигналом, который излучается при изменении состояния условия. Они могут быть объединены с использованием стандартных логических операторов (не, и т. Д.) В составные условия. Переменные, в отличие от условий, могут содержать любой объект Python, не только логические, но они не могут быть объединены.
Еще один удобный пакет — events. Он инкапсулирует ядро для подписки на события и запуска событий и выглядит как «естественная» часть языка. Он похож на язык C#, который предоставляет удобный способ объявлять события, подписываться на них и запускать их. Технически событие представляет собой «слот», к которому могут быть присоединены функции обратного вызова (обработчики событий) — процесс, называемый подпиской на событие.
# Define a callback function
def something_changed(reason):
print "something changed because %s" % reason
# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed
Когда событие запускается, последовательно вызываются все подключенные обработчики событий. Чтобы запустить событие, выполните вызов слота:
events.on_change('it had to happen')
Это выведет:
'something changed because it had to happen'
Дополнительную документацию можно найти в репозитории github или в документации .
Ты можешь попробовать buslane
модуль.
Эта библиотека облегчает реализацию системы, основанной на сообщениях. Он поддерживает команды (один обработчик) и события (0 или несколько обработчиков). Buslane использует аннотации типа Python для правильной регистрации обработчика.
Простой пример:
from dataclasses import dataclass
from buslane.commands import Command, CommandHandler, CommandBus
@dataclass(frozen=True)
class RegisterUserCommand(Command):
email: str
password: str
class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):
def handle(self, command: RegisterUserCommand) -> None:
assert command == RegisterUserCommand(
email='john@lennon.com',
password='secret',
)
command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
email='john@lennon.com',
password='secret',
))
Чтобы установить buslane, просто используйте pip:
$ pip install buslane
Некоторое время назад я написал библиотеку, которая может быть полезна для вас. Это позволяет вам иметь локальных и глобальных слушателей, несколько разных способов их регистрации, приоритет выполнения и так далее.
from pyeventdispatcher import register
register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)
dispatch(Event("foo.bar", {"id": 1}))
# first second
Посмотрите pyeventdispatcher