Поделиться списком между процессами на сервере Python

У меня есть простой UDPServer, который работает с многопроцессорностью.

Я хочу создать список, который содержит информацию обо всех клиентах.

Я использую Manager, но я не понимаю, как добавить информацию в список - мне нужен объект Transfer Manager для обработки, но как? Мой путь с новым атрибутом не работает.

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()

Как это исправить? Спасибо!

Выход:

Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
    self.handle()
  File "server.py", line 15, in handle
    ChatHandler.clients.append(self.client_address)
  File "<string>", line 2, in append
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
    self._connect()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

5 ответов

Решение

Проблема в том, что вы позволяете основному процессу завершить свое выполнение сразу после запуска рабочего процесса. Когда процесс, который создал multiprocessing.Manager заканчивает свое исполнение, то Manager сервер отключается, что означает, что ваш объект общего списка теперь бесполезен. Это происходит потому, что Manager Регистрирует объект это shutdown функционировать как "финализатор" с multiprocessing модуль, что означает, что он будет запущен непосредственно перед выходом из процесса. Вот код, который регистрирует его, в BaseManager.__init__:

    # register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

Вот код, который фактически завершает работу:

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''
    if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            if hasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

Исправить это просто - не позволяйте основному процессу завершиться сразу же после запуска процесса, который запускает сервер UDP, вызывая server_process.join():

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.

Если вы используете его в любом случае, как показано ниже, вам может потребоваться посмотреть длину списка, который вы передаете, или количество рабочих кодов в жестком коде, это может превышать возможности вашей машины:

        pool = Pool(len(somelist))
        # call the function 'somefunction' in parallel for each somelist.
        pool.map(somefunction, somelist)

я сократил количество рабочих, это решило проблему для меня.

Ниже приведен пример UDP-сервера и общего списка.

  • родительский код создает менеджер, управляемый список и передает его start_server()

  • эта функция, в свою очередь, фактически запускает сервер, сохраняя общий список, так что сервер и его обработчик могут получить к нему доступ

  • когда приходит пакет, handle() метод срабатывает. Это обращается к серверу, используя self.serverи общий список с self.server.client_listатрибут в экземпляре ChatServer.

Я выполнил тестирование, запустив сервер, подождав секунду, а затем отправив пакет UDP "beer", используя netcat команда. По какой-то причине он сначала посылает X, и каждый вывод дублируется. Это ошибка, но код должен указывать вам правильное направление.

источник

import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

class ChatHandler(DatagramRequestHandler):
    def handle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print '{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

class ChatServer(ForkingMixIn, UDPServer):
    client_list = None

def start_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

тестовый забег

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}

Если вы не можете использовать менеджер по какой-либо причине, вы также можете реализовать его самостоятельно, который соответствует вашим потребностям.

Мой unittest был настроен так, чтобы останавливать все дочерние процессы, которые остались, если они не были должным образом завершены, как ожидалось, что разрушило диспетчер. Поэтому мне нужно было что-то, что можно было бы произвольно запускать и останавливать, не беспокоясь о тестах.

      import multiprocessing
import atexit
import select

class SharedDict:
    """Share a dictionary across processes."""
    def __init__(self):
        """Create a shared dictionary."""
        super().__init__()
        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)
        self._start()

    def _start(self):
        """Ensure the process to manage the dictionary is running."""
        if self.process is not None and self.process.is_alive():
            return

        # if the manager has already been running in the past but stopped
        # for some reason, the dictionary contents are lost
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    def manage(self):
        """Manage the dictionary, handle read and write requests."""
        shared_dict = dict()
        while True:
            message = self.pipe[0].recv()
            logger.spam('SharedDict got %s', message)

            if message[0] == 'stop':
                return

            if message[0] == 'set':
                shared_dict[message[1]] = message[2]

            if message[0] == 'get':
                self.pipe[0].send(shared_dict.get(message[1]))

    def _stop(self):
        """Stop the managing process."""
        self.pipe[1].send(('stop',))

    def get(self, key):
        """Get a value from the dictionary."""
        return self.__getitem__(key)

    def __setitem__(self, key, value):
        self.pipe[1].send(('set', key, value))

    def __getitem__(self, key):
        self.pipe[1].send(('get', key))

        # to avoid blocking forever if something goes wrong
        select.select([self.pipe[1]], [], [], 0.1)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        return None

    def __del__(self):
        self._stop()


shared_dict = SharedDict()

Вы можете расширить это с помощью всевозможных методов, и вы можете останавливать и перезапускать его, когда захотите (хотя dict каждый раз будет теряться). Каналы будут оставаться неизменными все время, поэтому все дочерние процессы также могут общаться с перезапущенным менеджером без необходимости в новых файловых файлах каналов.

Я мог бы расширить этот материал, добавив больше функций. Если я пока что не переместил этот класс в отдельный модуль, его можно найти по адресу https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py

Вы можете использовать собственной библиотекой Python.многопроцессорную обработку с

Или это:

      import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()

def worker1(l):
    l.append(1)

def worker2(l):
    l.append(2)

process1 = multiprocessing.Process(
    target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
    target=worker2, args=[shared_list])

process1.start()
process2.start()
process1.join()
process2.join()

print shared_list
Другие вопросы по тегам