Структура данных для UDP-сервера, анализирующего объекты JSON в python

У меня есть UDP-сервер, прослушивающий входящий трафик в потоке. Сообщения приходят с внешнего устройства в формате JSON, например, {"_id": "0x00", "status": "on"}. Эта информация должна быть проанализирована обработчиком UDP и сохранена в виде набора объектов (или обновлена, если _id существует). В настоящий момент я могу получать и анализировать JSON, но не совсем уверен, как хранить эти данные или обрабатывать их правильно. Я хотел бы использовать Очередь в UDP-Обработчике и отдельный Обработчик Msg, который обрабатывает очередь, но не уверен, является ли это правильным способом сделать это.

Обратите внимание: я переписал код без синтаксиса и т. Д.

# I create a node server thread in the main of my program with IP/PORT

class NodeServer(threading.Thread):
    def __init__():
        self.server = UDPServer((address, port), UDPHandler)

    def run():
        self.server.serve_forever()

class UDPServer(socketserver.ThreadingUDPServer):
    allow_reuse_address = True

class UDPHandler(socketserver.BaseRequestHandler):
    data_q = queue.Queue()

    handler = NodeHandler(data_q)
    handler.start()

    def handle():
        try:
            # receive the message from client
            data = self.request[0].decode("UTF-8")

            # check if it's in json format

            # HANDLE THE MESSAGE
            self.data_q.put(data)

            # stop the thread
            self.handler.join()
            # send an "ACK" msg back to the client
      except: Exception as e:
            #handle

class NodeHandler(threading.Thread):
    table = NodeTable()

    def __init(data_q):
        self.data_q = data_q

    def handle():
      # get the string message from the queue (filled by the UDPHandler)
      msg = self.data_q.get()

      # check if the "_id" field exists in the current node table

      # if it exists in the table, find it, update the fields from json

      # otherwise, if it's a new "_id": create a new node
      json_msg = json.parse(msg)
      node = Node(json_msg["_id"])
      # set other node parameters from the json object, status etc

      # update the node table with the new information from the message
      table.put(nd)
      #
    def join():
        # join thread

class NodeTable():
  # the table is a dictionary with ID and a Node object, ie {0x01: Node}
  table = {}
  def put(_id, node):
      self.table[_id] = node

  def get(_id):
      return self.table[_id]

class Node():
    id = 0
    def __init__(id):
        self.id = id
        # new node
    # other node functions
  • Должен ли NodeHandler быть создан в обработчике UDP как отдельный поток или просто как объект-обработчик?
  • Должна ли таблица узлов быть глобальной, если нет, как получить к ней доступ за пределами сервера? Возможно, просто передать его как объект.
  • Является ли NodeTable хорошей структурой данных для хранения уникальных объектов-узлов, или есть лучший способ?

Спасибо!

1 ответ

Решение

1) Вы можете реализовать сервер UDP, который намного проще обрабатывает входящие сообщения в бесконечном цикле с помощью следующих строк кода:

 import socket

 def udp_server(udp_ip, udp_port, ...):
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   sock.bind((upd_ip, upd_port))
   while True:
     data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
     ...process data...

См. https://wiki.python.org/moin/UdpCommunication для получения дополнительной информации и обсуждения.

Аргументы udp_server IP-адрес и порт udp, а также любые другие структуры данных, с которыми сервер должен взаимодействовать.

И начинать это с собственного потока очень легко:

 import threading

 t = threading.Thread(target = udp_server, args = (...))
 t.start()

2) Класс NodeTable - это просто оболочка вокруг словаря Python, но, похоже, вы хотите, чтобы к нему одновременно обращались несколько потоков. В этом случае вы должны прочитать этот SO ответ: (ссылка).

В зависимости от того, что другие потоки, кроме потока сервера, могут делать со словарем узла, может потребоваться или не потребоваться блокировка.

Подводя итог, где я мог бы написать код:

 def main():
   nodes = {}     # use a simple dict for storing the nodes
   lock = RLock() # if you need this
   # pass nodes and lock to server thread and start it
   t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock))
   t.start() 
   ...

В этот момент сервер udp работает, и основной поток может получить доступ к таблице узлов через переменную. nodes,

Нужно ли информировать основной поток о добавлении новых узлов в таблицу узлов? Тогда, возможно, Очередь - это то, что вы хотите. Вы бы 1) создать его в main() и 2) передать его udp_server:

 def main()
   nodes = {}     # use a simple dict for storing the nodes
   lock = RLock() # if you need this
   q = Queue()    # create a Queue and pass it to the udp server
   # pass nodes and lock to server thread and start it
   t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock, q))
   t.start() 
   # process entries from the Queue
   while True:
     item = q.get()
     ... process item...

и в функции сервера udp ...process data... поместит что-то в очередь:

   while True:
     data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
     ...json decode, etc. ...
     q.put(...) 
Другие вопросы по тегам