Как я могу написать сервер сокетов в другом потоке от моей основной программы (используя gevent)?

Я разрабатываю веб-сервер Flask/gevent WSGIserver, который должен взаимодействовать (в фоновом режиме) с аппаратным устройством через два сокета с использованием XML.

Один сокет инициируется клиентом (моим приложением), и я могу отправлять команды XML на устройство. Устройство отвечает на другой порт и отправляет обратно информацию, которую мое приложение должно подтвердить. Поэтому мое приложение должно прослушивать этот второй порт.

До сих пор я выполнял команду, открыл второй порт в качестве сервера, дождался ответа от устройства и закрыл второй порт.

Проблема в том, что возможно, что устройство отправляет несколько ответов, которые я должен подтвердить. Поэтому я решил оставить порт открытым и отвечать на входящие запросы. Однако, в конце концов, устройство завершило отправку запросов, и мое приложение все еще прослушивает (я не знаю, когда устройство будет выполнено), блокируя тем самым все остальное.

Это казалось идеальным вариантом использования для потока, так что мое приложение запускает сервер прослушивания в отдельном потоке. Поскольку я уже использую gevent в качестве сервера WSGI для Flask, я могу использовать гринлеты.

Проблема в том, что я искал хороший пример такой вещи, но все, что я могу найти, это примеры многопоточных обработчиков для сервера с одним сокетом. Мне не нужно обрабатывать много соединений на сокет-сервере, но мне нужно, чтобы он запускался в отдельном потоке, чтобы он мог прослушивать и обрабатывать входящие сообщения, в то время как моя основная программа может продолжать отправлять сообщения. Вторая проблема, с которой я сталкиваюсь, заключается в том, что на сервере мне нужно использовать некоторые методы из моего "основного" класса. Будучи относительно новым для Python, я не уверен, как его структурировать таким образом, чтобы это стало возможным.

class Device(object):

def __init__(self, ...):
    self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def _connect_to_device(self):
    print "OPEN CONNECTION TO DEVICE"
    try:
        self.clientsocket.connect((self.ip, 5100))
    except socket.error as e:
        pass

def _disconnect_from_device(self):
    print "CLOSE CONNECTION TO DEVICE"
    self.clientsocket.close()

def deviceaction1(self, ...):
    # the data that is sent is an XML document that depends on the parameters of this method.
    self._connect_to_device()
    self._send_data(XMLdoc)
    self._wait_for_response()
    return True

def _send_data(self, data):
    print "SEND:"
    print(data)
    self.clientsocket.send(data)

def _wait_for_response(self):
    print "WAITING FOR REQUESTS FROM DEVICE (CHANNEL 1)"
    self.serversocket.bind(('10.0.0.16', 5102))
    self.serversocket.listen(5)                         # listen for answer, maximum 5 connections
    connection, address = self.serversocket.accept()
    # the data is of a specific length I can calculate
    if len(data) > 0:
        self._process_response(data)
        self.serversocket.close()

def _process_response(self, data):
    print "RECEIVED:"
    print(data)
    # here is some code that processes the incoming data and
    # responds to the device
    # this may or may not result in more incoming data



if __name__ == '__main__':
    machine = Device(ip="10.0.0.240")
    Device.deviceaction1(...)

Это то, что я сейчас делаю (глобально, я не учел конфиденциальную информацию). Как видите, все последовательно. Если кто-то может предоставить пример прослушивающего сервера в отдельном потоке (предпочтительно с использованием гринлетов) и способ связи с прослушивающего сервера обратно в порождающий поток, это было бы очень полезно.

Благодарю.

РЕДАКТИРОВАТЬ: Попробовав несколько методов, я решил использовать метод Python по умолчанию select() для решения этой проблемы. Это сработало, поэтому мой вопрос об использовании потоков больше не актуален. Спасибо за людей, которые предоставили вклад за ваше время и усилия.

1 ответ

Надеюсь, что он может оказать некоторую помощь, в примере класса, если мы позвоним tenMessageSender функция затем запустит асинхронный поток без блокировки основного цикла, а затем _zmqBasedListener начнет слушать на отдельном порту, пока поток не будет активен. и какое бы сообщение наши tenMessageSender функция отправит, те будут получены клиентом и ответят на zmqBasedListener,

Сторона сервера

import threading
import zmq
import sys

class Example:
    def __init__(self):
        self.context = zmq.Context()
        self.publisher = self.context.socket(zmq.PUB)
        self.publisher.bind('tcp://127.0.0.1:9997')
        self.subscriber = self.context.socket(zmq.SUB)
        self.thread = threading.Thread(target=self._zmqBasedListener)

    def _zmqBasedListener(self):
        self.subscriber.connect('tcp://127.0.0.1:9998')
        self.subscriber.setsockopt(zmq.SUBSCRIBE, "some_key")
        while True:
            message = self.subscriber.recv()
            print message
        sys.exit()

    def tenMessageSender(self):
        self._decideListener()
        for message in range(10):
            self.publisher.send("testid : %d: I am a task" %message)

    def _decideListener(self):
        if not self.thread.is_alive():
            print "STARTING THREAD"
            self.thread.start()

клиент

import zmq
context = zmq.Context()

subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9997')
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9998')
subscriber.setsockopt(zmq.SUBSCRIBE, "testid")
count = 0
print "Listener"
while True:
    message = subscriber.recv()
    print message
    publisher.send('some_key : Message received %d' %count)
    count+=1

Вместо нити можно использовать гринлет и т. Д.

Другие вопросы по тегам