Как справиться с асинхронностью внутри класса в Python, ничего не блокируя?

Мне нужно создать класс, который может получать и хранить SMTP-сообщения, т.е. электронные письма. Для этого я использую asyncore в соответствии с примером, размещенным здесь. Тем не мение, asyncore.loop() блокирует, поэтому я не могу ничего сделать в коде.

Поэтому я подумал об использовании потоков. Вот пример кода, который показывает, что я имею в виду:

class MyServer(smtpd.SMTPServer):
    # derive from the python server class

    def process_message(..):
        # overwrite a smtpd.SMTPServer method to be able to handle the received messages
        ...
        self.list_emails.append(this_email)

    def get_number_received_emails(self):
        """Return the current number of stored emails"""
        return len(self.list_emails)


    def start_receiving(self):
        """Start the actual server to listen on port 25"""

        self.thread =   threading.Thread(target=asyncore.loop)
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver from itself
        self.close()
        self.thread.join()

Я надеюсь, что вы получите картину. Класс MyServer должен иметь возможность запускать и останавливать прослушивание порта 25 неблокирующим способом, иметь возможность запрашивать сообщения во время прослушивания (или нет). start метод запускает asyncore.loop() слушатель, который, когда происходит получение электронного письма, добавляется во внутренний список. Похоже, stop Метод должен иметь возможность остановить этот сервер, как предлагается здесь.

Несмотря на то, что этот код работает не так, как я ожидаю (asyncore, кажется, работает вечно, даже я называю выше stop метод. error Я поднимаю пойман внутри stop, но не в пределах target функция, содержащая asyncore.loop()), Я не уверен, что мой подход к проблеме имеет смысл. Будем благодарны за любые предложения по исправлению вышеуказанного кода или предложению более надежной реализации (без использования стороннего программного обеспечения).

3 ответа

Решение

Предоставленное решение может быть не самым сложным, но оно работает разумно и было протестировано.

Прежде всего, дело с asyncore.loop() является то, что он блокирует, пока все asyncore каналы закрыты, как указал пользователь Wessie в комментарии ранее. Ссылаясь на пример smtp, упомянутый ранее, получается, что smtpd.SMTPServer наследуется от asyncore.dispatcher (как описано в документации smtpd), которая отвечает на вопрос, какой канал должен быть закрыт.

Поэтому на исходный вопрос можно ответить следующим обновленным примером кода:

class CustomSMTPServer(smtpd.SMTPServer):
    # store the emails in any form inside the custom SMTP server
    emails = []
    # overwrite the method that is used to process the received 
    # emails, putting them into self.emails for example
    def process_message(self, peer, mailfrom, rcpttos, data):
        # email processing


class MyReceiver(object):
    def start(self):
        """Start the listening service"""
        # here I create an instance of the SMTP server, derived from  asyncore.dispatcher
        self.smtp = CustomSMTPServer(('0.0.0.0', 25), None)
        # and here I also start the asyncore loop, listening for SMTP connection, within a thread
        # timeout parameter is important, otherwise code will block 30 seconds after the smtp channel has been closed
        self.thread =  threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} )
        self.thread.start()     

    def stop(self):
        """Stop listening now to port 25"""
        # close the SMTPserver to ensure no channels connect to asyncore
        self.smtp.close()
        # now it is save to wait for the thread to finish, i.e. for asyncore.loop() to exit
        self.thread.join()

    # now it finally it is possible to use an instance of this class to check for emails or whatever in a non-blocking way
    def count(self):
        """Return the number of emails received"""
        return len(self.smtp.emails)        
    def get(self):
        """Return all emails received so far"""
        return self.smtp.emails
    ....

Итак, в конце концов, у меня есть start и stop метод запуска и остановки прослушивания на порту 25 в неблокирующей среде.

Исходя из другого вопроса, asyncore.loop не завершается, когда больше нет соединений

Я думаю, что вы немного задумались над тем, что делать. Используя код из другого вопроса, вы можете начать новый поток, который запускает asyncore.loop следующим фрагментом кода:

import threading

loop_thread = threading.Thread(target=asyncore.loop, name="Asyncore Loop")
# If you want to make the thread a daemon
# loop_thread.daemon = True
loop_thread.start()

Это запустит его в новом потоке и продолжит, пока все asyncore каналы закрыты.

Вместо этого вы должны рассмотреть возможность использования Twisted. http://twistedmatrix.com/trac/browser/trunk/doc/mail/examples/emailserver.tac демонстрирует, как настроить SMTP-сервер с настраиваемым подключением при доставке.

Ответ Алекса - лучший, но был неполным для моего варианта использования. Я хотел протестировать SMTP как часть модульного теста, который подразумевал построение поддельного SMTP-сервера внутри моих тестовых объектов, и сервер не прерывал поток asyncio, поэтому мне пришлось добавить строку, чтобы установить его в поток демона, чтобы позволить остальным модульный тест, чтобы завершить, не блокируя ожидание того потока asyncio, чтобы присоединиться. Я также добавил в полное ведение журнала всех данных электронной почты, чтобы я мог утверждать все, что отправлено через SMTP.

Вот мой поддельный класс SMTP:

class TestingSMTP(smtpd.SMTPServer):
    def __init__(self, *args, **kwargs):
        super(TestingSMTP, self).__init__(*args, **kwargs)
        self.emails = []

    def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
        msg = {'peer': peer,
               'mailfrom': mailfrom,
               'rcpttos': rcpttos,
               'data': data}
        msg.update(kwargs)
        self.emails.append(msg)


class TestingSMTP_Server(object):

    def __init__(self):
        self.smtp = TestingSMTP(('0.0.0.0', 25), None)
        self.thread = threading.Thread()

    def start(self):
        self.thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
        self.thread.daemon = True
        self.thread.start()

    def stop(self):
        self.smtp.close()
        self.thread.join()

    def count(self):
        return len(self.smtp.emails)

    def get(self):
        return self.smtp.emails

А вот как это называется классами unittest:

smtp_server = TestingSMTP_Server()
smtp_server.start()

# send some emails

assertTrue(smtp_server.count() == 1) # or however many you intended to send
assertEqual(self.smtp_server.get()[0]['mailfrom'], 'first@fromaddress.com')

# stop it when done testing
smtp_server.stop()

В случае, если кому-то еще нужно это немного прояснить, вот что я в итоге использовал. Здесь используется smtpd для почтового сервера и smtpblib для почтового клиента с Flask в качестве http-сервера [суть]:

app.py

from flask import Flask, render_template
from smtp_client import send_email
from smtp_server import SMTPServer

app = Flask(__name__)

@app.route('/send_email')
def email():
  server = SMTPServer()
  server.start()
  try:
    send_email()
  finally:
    server.stop()
  return 'OK'

@app.route('/')
def index():
  return 'Woohoo'

if __name__ == '__main__':
  app.run(debug=True, host='0.0.0.0')

smtp_server.py

# smtp_server.py
import smtpd
import asyncore
import threading

class CustomSMTPServer(smtpd.SMTPServer):
  def process_message(self, peer, mailfrom, rcpttos, data):
    print('Receiving message from:', peer)
    print('Message addressed from:', mailfrom)
    print('Message addressed to:', rcpttos)
    print('Message length:', len(data))
    return

class SMTPServer():
  def __init__(self):
    self.port = 1025

  def start(self):
    '''Start listening on self.port'''
    # create an instance of the SMTP server, derived from  asyncore.dispatcher
    self.smtp = CustomSMTPServer(('0.0.0.0', self.port), None)
    # start the asyncore loop, listening for SMTP connection, within a thread
    # timeout parameter is important, otherwise code will block 30 seconds
    # after the smtp channel has been closed
    kwargs = {'timeout':1, 'use_poll': True}
    self.thread = threading.Thread(target=asyncore.loop, kwargs=kwargs)
    self.thread.start()

  def stop(self):
    '''Stop listening to self.port'''
    # close the SMTPserver to ensure no channels connect to asyncore
    self.smtp.close()
    # now it is safe to wait for asyncore.loop() to exit
    self.thread.join()

  # check for emails in a non-blocking way
  def get(self):
    '''Return all emails received so far'''
    return self.smtp.emails

if __name__ == '__main__':
  server = CustomSMTPServer(('0.0.0.0', 1025), None)
  asyncore.loop()

smtp_client.py

import smtplib
import email.utils
from email.mime.text import MIMEText

def send_email():
  sender='author@example.com'
  recipient='6142546977@tmomail.net'

  msg = MIMEText('This is the body of the message.')
  msg['To'] = email.utils.formataddr(('Recipient', recipient))
  msg['From'] = email.utils.formataddr(('Author', 'author@example.com'))
  msg['Subject'] = 'Simple test message'

  client = smtplib.SMTP('127.0.0.1', 1025)
  client.set_debuglevel(True) # show communication with the server
  try:
    client.sendmail('author@example.com', [recipient], msg.as_string())
  finally:
    client.quit()

Затем запустите сервер с python app.py а в другом запросе имитируйте запрос к /send_email с участием curl localhost:5000/send_email. Обратите внимание, что для фактической отправки электронной почты (или sms) вам нужно перепрыгнуть через другие обручи, подробно описанные здесь: https://blog.codinghorror.com/so-youd-like-to-send-some-email-through-code/.

Другие вопросы по тегам