Как бы вы реализовали базовый цикл обработки событий?
Если вы работали с инструментарием GUI, вы знаете, что есть цикл обработки событий / основной цикл, который должен выполняться после того, как все сделано, и это будет поддерживать работу приложения и реагировать на различные события. Например, для Qt вы должны сделать это в main():
int main() {
QApplication app(argc, argv);
// init code
return app.exec();
}
Что в данном случае app.exec() является основным циклом приложения.
Очевидный способ реализовать цикл такого рода:
void exec() {
while (1) {
process_events(); // create a thread for each new event (possibly?)
}
}
Но это ограничивает процессор до 100% и практически бесполезен. Теперь, как я могу реализовать такой цикл событий, который реагирует, не потребляя при этом процессор?
Ответы приветствуются в Python и / или C++. Благодарю.
Сноска. Ради обучения я буду реализовывать свои собственные сигналы / слоты и использовать их для генерации пользовательских событий (например, go_forward_event(steps)
). Но если вы знаете, как я могу использовать системные события вручную, я бы тоже хотел узнать об этом.
7 ответов
Я много размышлял о том же самом!
Основной цикл GUI выглядит следующим образом в псевдокоде:
void App::exec() {
for(;;) {
vector<Waitable> waitables;
waitables.push_back(m_networkSocket);
waitables.push_back(m_xConnection);
waitables.push_back(m_globalTimer);
Waitable* whatHappened = System::waitOnAll(waitables);
switch(whatHappened) {
case &m_networkSocket: readAndDispatchNetworkEvent(); break;
case &m_xConnection: readAndDispatchGuiEvent(); break;
case &m_globalTimer: readAndDispatchTimerEvent(); break;
}
}
}
Что такое "Ждать"? Ну, это зависит от системы. В UNIX это называется "дескриптор файла", а "waitOnAll" - системный вызов::select. Так называемый vector<Waitable>
это ::fd_set
в UNIX, а "whatHappened" фактически запрашивается через FD_ISSET
, Фактические ожидаемые дескрипторы приобретаются различными способами, например m_xConnection
может быть взято из::XConnectionNumber(). X11 также предоставляет высокоуровневый, переносимый API для этого -::XNextEvent() - но если бы вы использовали это, вы не смогли бы ожидать нескольких источников событий одновременно.
Как работает блокировка? "waitOnAll" - это системный вызов, который указывает ОС поместить ваш процесс в "спящий список". Это означает, что вам не дается никакого времени ЦП, пока не произойдет событие в одной из ожидаемых. Это означает, что ваш процесс простаивает, потребляя 0% ЦП. Когда происходит событие, ваш процесс ненадолго отреагирует на него и вернется в состояние ожидания. Приложения с графическим интерфейсом почти все время проводят в режиме ожидания.
Что происходит со всеми циклами процессора во время сна? Зависит. Иногда другой процесс будет использовать их. Если нет, ваша ОС будет занята зацикливанием процессора, или переведет его во временный режим пониженного энергопотребления и т. Д.
Пожалуйста, попросите дополнительную информацию!
Python:
Вы можете посмотреть на реализацию Twisted реактора, который, вероятно, является лучшей реализацией для цикла событий в python. Реакторы в Twisted являются реализациями интерфейса, и вы можете указать тип реактора для запуска: select, epoll, kqueue (все основаны на ac api, использующем эти системные вызовы), есть также реакторы, основанные на наборах инструментов QT и GTK.
Простая реализация будет использовать select:
#echo server that accepts multiple client connections without forking threads
import select
import socket
import sys
host = ''
port = 50000
backlog = 5
size = 1024
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host,port))
server.listen(backlog)
input = [server,sys.stdin]
running = 1
#the eventloop running
while running:
inputready,outputready,exceptready = select.select(input,[],[])
for s in inputready:
if s == server:
# handle the server socket
client, address = server.accept()
input.append(client)
elif s == sys.stdin:
# handle standard input
junk = sys.stdin.readline()
running = 0
else:
# handle all other sockets
data = s.recv(size)
if data:
s.send(data)
else:
s.close()
input.remove(s)
server.close()
Вообще, я бы сделал это с помощью какого-то семафора:
- Семафор начинается с нуля.
- Цикл событий ожидает семафор.
- Приходят события, семафор увеличивается.
- Обработчик события разблокирует и уменьшает семафор и обрабатывает событие.
- Когда все события обработаны, семафор равен нулю и цикл событий снова блокируется.
Если вы не хотите усложнять ситуацию, вы можете просто добавить вызов sleep() в цикл while с тривиально малым временем ожидания. Это заставит ваш поток обработки сообщений передавать свое процессорное время другим потокам. Процессор больше не будет привязан к 100%, но это все еще довольно расточительно.
Я бы использовал простую и легкую библиотеку сообщений под названием ZeroMQ ( http://www.zeromq.org/). Это библиотека с открытым исходным кодом (LGPL). Это очень маленькая библиотека; на моем сервере весь проект компилируется примерно за 60 секунд.
ZeroMQ значительно упростит ваш управляемый событиями код, и это также самое эффективное решение с точки зрения производительности. Обмен данными между потоками с использованием ZeroMQ происходит намного быстрее (с точки зрения скорости), чем при использовании семафоров или локальных сокетов UNIX. ZeroMQ также является 100% переносимым решением, тогда как все остальные решения привязывают ваш код к конкретной операционной системе.
Вот цикл событий C++. При создании объекта
EventLoop
, он создает поток, который постоянно выполняет любую поставленную перед ним задачу. Если доступных задач нет, основной поток переходит в спящий режим, пока не будет добавлена какая-либо задача.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>
class NoElements : public std::runtime_error
{
public:
NoElements(const char* error)
: std::runtime_error(error)
{
}
};
template <typename Type>
struct SafeQueueCompare {
typedef std::chrono::time_point<std::chrono::system_clock, std::chrono::duration<double, std::ratio<1, 1000000000> > > Milliseconds;
typedef std::tuple<typename SafeQueueCompare<Type>::Milliseconds, Type> TimePoint;
bool operator()(const typename SafeQueueCompare<Type>::TimePoint left, const typename SafeQueueCompare<Type>::TimePoint right) {
return std::get<0>(left) < std::get<0>(right);
}
};
// An almost thread safe queue
template <typename Type>
struct SafeQueue
{
typedef std::multiset<
typename SafeQueueCompare<Type>::TimePoint,
SafeQueueCompare<Type>
> SafeQueueType;
SafeQueueType _queue;
bool _shutdown;
mutable std::mutex _mutex;
std::condition_variable _condition_variable;
SafeQueue()
: _shutdown(false)
{}
virtual ~SafeQueue()
{}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
SafeQueue(const SafeQueue&) = delete;
SafeQueue& operator =(const SafeQueue&) = delete;
SafeQueue(const SafeQueue&&) = delete;
SafeQueue& operator =(const SafeQueue&&) = delete;
void shutdown() {
_shutdown = true;
_condition_variable.notify_all();
}
// Add an element to the queue
void enqueue(int timeout, Type element) {
std::lock_guard<std::mutex> lock(_mutex);
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
typename SafeQueueCompare<Type>::Milliseconds newtime = timenow + std::chrono::duration<double, std::ratio<1, 1000>>(timeout);
_queue.insert(std::make_tuple(newtime, element));
_condition_variable.notify_one();
}
// A blocking function which returns true when a new element can be consumed by get()
bool dequeue() {
std::unique_lock<std::mutex> lock(_mutex);
typename SafeQueueType::iterator timeriterator;
do {
if (_shutdown) {
return false;
}
// release lock as long as the wait and reacquire it afterwards
timeriterator = _queue.begin();
if ( timeriterator != _queue.end() ) {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
typename SafeQueueCompare<Type>::TimePoint element = *timeriterator;
typename SafeQueueCompare<Type>::Milliseconds sleeptime = std::get<0>(element);
if ( sleeptime <= timenow ) {
break;
}
std::cv_status status = _condition_variable.wait_until(lock, sleeptime);
if ( std::cv_status::timeout == status ) {
break;
}
}
else {
_condition_variable.wait(lock);
}
} while ( true );
return true;
}
// Get the first-element
// This is not thread safe if there are multiple consumers
// In this case, you should hold a mutex before calling dequeue() and get()
Type get() {
std::unique_lock<std::mutex> lock(_mutex);
auto timeriterator = _queue.begin();
if ( timeriterator == _queue.end() ) {
throw NoElements("You cannot call get() on a empty queue. Please hold a mutex before calling dequeue() and get().");
}
if (_shutdown) {
throw std::runtime_error("You cannot call SafeQueue get() after shutting it down.");
}
Type element = std::get<1>(*timeriterator);
_queue.erase(timeriterator);
return element;
}
};
/**
* You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
* With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
* With futures we can get values from the event loop:
* ```
* std::promise<int> accumulate_promise;
* event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
* std::future<int> accumulate_future = accumulate_promise.get_future();
* accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
* std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
* ```
* It is just not a nice ideia to add something which hang the whole event loop queue.
*/
template <class Type>
struct EventLoop {
SafeQueue<Type> _safequeue;
std::thread _runner;
bool _free_shutdown;
EventLoop()
: _runner(&EventLoop<Type>::_event_loop, this),
_free_shutdown(false)
{
}
virtual ~EventLoop() {
_safequeue.shutdown();
_runner.join();
}
// Run all events on the queue before exiting
void free_shutdown() {
_free_shutdown = true;
_safequeue.shutdown();
}
void enqueue(int timeout, Type element) {
_safequeue.enqueue(timeout, element);
}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
EventLoop(const EventLoop&&) = delete;
EventLoop& operator =(const EventLoop&&) = delete;
void _event_loop() {
while (true) {
if ( _safequeue.dequeue() ) {
try {
Type call = _safequeue.get();
call();
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on SafeQueue dequeue running: '" << error.what() << "'" << std::endl;
}
}
else {
break;
}
}
while (_free_shutdown) {
try {
Type call = _safequeue.get();
call();
}
catch (NoElements&) {
break;
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on SafeQueue dequeue shutdown: '" << error.what() << "'" << std::endl;
}
}
}
};
const char* getTime(char* buffer, int size) {
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, size, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, size, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
}
// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
char buffer[20];
std::cerr << getTime(buffer, sizeof buffer) << "Creating EventLoop" << std::endl;
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>();
std::cerr << getTime(buffer, sizeof buffer) << "Adding event element" << std::endl;
eventloop->enqueue( 3000, []{ char buffer[20]; std::cerr << getTime(buffer, sizeof buffer) << "Running task 3" << std::endl; } );
eventloop->enqueue( 1000, []{ char buffer[20]; std::cerr << getTime(buffer, sizeof buffer) << "Running task 1" << std::endl; } );
eventloop->enqueue( 2000, []{ char buffer[20]; std::cerr << getTime(buffer, sizeof buffer) << "Running task 2" << std::endl; } );
std::this_thread::sleep_for( std::chrono::duration<double, std::ratio<1, 1000>>(5000) );
delete eventloop;
std::cerr << getTime(buffer, sizeof buffer) << "Exiting after 10 seconds..." << std::endl;
return 0;
}
Пример выходного теста:
02:08:28.960 Creating EventLoop
02:08:28.960 Adding event element
02:08:29.960 Running task 1
02:08:30.961 Running task 2
02:08:31.961 Running task 3
02:08:33.961 Exiting after 10 seconds...
Это ответ для unix-подобных систем, таких как Linux или Mac OS X. Я не знаю, как это делается в Windows.
select () или pselect(). В Linux также есть poll().
Просмотрите страницы руководства для получения более подробной информации. Этим системным вызовам нужны списки файловых дескрипторов, тайм-аут и / или маска сигнала. Эти системные вызовы позволяют программе дождаться события. Если один из файловых дескрипторов в списке готов к чтению или записи (в зависимости от настроек, см. Справочные страницы), истекло время ожидания или поступил сигнал, эти системные вызовы вернутся. Затем программа может читать / записывать дескрипторы файлов, обрабатывать сигналы или делать другие вещи. После этого он снова вызывает (p)select/poll и ждет до следующего события.
Сокеты должны быть открыты как неблокирующие, чтобы функция чтения / записи возвращалась, когда нет данных / буфер заполнен. С общим сервером отображения X11 графический интерфейс обрабатывается через сокет и имеет файловый дескриптор. Так что с этим можно справиться точно так же.
Прежде чем создавать базовое приложение Event-loop в Python. Давайте разбираться
what is Event Loop ?
Цикл событий — это центральный компонент любой инфраструктуры асинхронного ввода-вывода, который позволяет вам выполнять операции ввода-вывода одновременно, не блокируя выполнение вашей программы. Цикл событий выполняется в одном потоке и отвечает за получение и отправку событий ввода-вывода (например, чтения/записи в файл или прерывания клавиатуры) по мере их возникновения.
import asyncio
async def coroutine():
print('Start')
await asyncio.sleep(1)
print('End')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())