Поделиться списком между процессами на сервере Python
У меня есть простой UDPServer, который работает с многопроцессорностью.
Я хочу создать список, который содержит информацию обо всех клиентах.
Я использую Manager, но я не понимаю, как добавить информацию в список - мне нужен объект Transfer Manager для обработки, но как? Мой путь с новым атрибутом не работает.
import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections
class ChatHandler(DatagramRequestHandler):
def handle(self):
cur_process = multiprocessing.current_process()
data = self.request[0].strip()
socket = self.request[1]
ChatHandler.clients.append(self.client_address) # error here
print(ChatHandler.clients)
class ChatServer(ForkingMixIn, UDPServer):
pass
if __name__ == '__main__':
server = ChatServer((host, port), ChatHandler)
ChatHandler.clients = multiprocessing.Manager().list()
server_process = multiprocessing.Process(target=server.serve_forever)
server_process.daemon = False
server_process.start()
Как это исправить? Спасибо!
Выход:
Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
self.finish_request(request, client_address)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
self.handle()
File "server.py", line 15, in handle
ChatHandler.clients.append(self.client_address)
File "<string>", line 2, in append
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
self._connect()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
c = SocketClient(address)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
5 ответов
Проблема в том, что вы позволяете основному процессу завершить свое выполнение сразу после запуска рабочего процесса. Когда процесс, который создал multiprocessing.Manager
заканчивает свое исполнение, то Manager
сервер отключается, что означает, что ваш объект общего списка теперь бесполезен. Это происходит потому, что Manager
Регистрирует объект это shutdown
функционировать как "финализатор" с multiprocessing
модуль, что означает, что он будет запущен непосредственно перед выходом из процесса. Вот код, который регистрирует его, в BaseManager.__init__
:
# register a finalizer
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)
Вот код, который фактически завершает работу:
@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
'''
Shutdown the manager process; will be registered as a finalizer
'''
if process.is_alive():
util.info('sending shutdown message to manager')
try:
conn = _Client(address, authkey=authkey)
try:
dispatch(conn, None, 'shutdown')
finally:
conn.close()
except Exception:
pass
process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
if process.is_alive():
util.info('manager still alive after terminate')
state.value = State.SHUTDOWN
try:
del BaseProxy._address_to_local[address]
except KeyError:
pass
Исправить это просто - не позволяйте основному процессу завершиться сразу же после запуска процесса, который запускает сервер UDP, вызывая server_process.join()
:
import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections
class ChatHandler(DatagramRequestHandler):
def handle(self):
cur_process = multiprocessing.current_process()
data = self.request[0].strip()
socket = self.request[1]
ChatHandler.clients.append(self.client_address) # error here
print(ChatHandler.clients)
class ChatServer(ForkingMixIn, UDPServer):
pass
if __name__ == '__main__':
server = ChatServer((host, port), ChatHandler)
ChatHandler.clients = multiprocessing.Manager().list()
server_process = multiprocessing.Process(target=server.serve_forever)
server_process.daemon = False
server_process.start()
server_process.join() # This fixes the issue.
Если вы используете его в любом случае, как показано ниже, вам может потребоваться посмотреть длину списка, который вы передаете, или количество рабочих кодов в жестком коде, это может превышать возможности вашей машины:
pool = Pool(len(somelist))
# call the function 'somefunction' in parallel for each somelist.
pool.map(somefunction, somelist)
я сократил количество рабочих, это решило проблему для меня.
Ниже приведен пример UDP-сервера и общего списка.
родительский код создает менеджер, управляемый список и передает его
start_server()
эта функция, в свою очередь, фактически запускает сервер, сохраняя общий список, так что сервер и его обработчик могут получить к нему доступ
когда приходит пакет,
handle()
метод срабатывает. Это обращается к серверу, используяself.server
и общий список сself.server.client_list
атрибут в экземпляре ChatServer.
Я выполнил тестирование, запустив сервер, подождав секунду, а затем отправив пакет UDP "beer", используя netcat
команда. По какой-то причине он сначала посылает X, и каждый вывод дублируется. Это ошибка, но код должен указывать вам правильное направление.
источник
import multiprocessing as mp, signal, sys
from SocketServer import (
UDPServer, ForkingMixIn, DatagramRequestHandler
)
class ChatHandler(DatagramRequestHandler):
def handle(self):
data,_socket = self.request
curproc = mp.current_process()
print '{}: {}'.format(
curproc,
dict(
data_len=len(data),
data=data.strip(),
client=self.client_address,
))
self.server.client_list.append(
self.client_address)
print('{}: {}'.format(
curproc,
dict(client_list=self.server.client_list),
))
class ChatServer(ForkingMixIn, UDPServer):
client_list = None
def start_server(client_list):
server = ChatServer(('', 9876), ChatHandler)
server.client_list = client_list
server.serve_forever()
if __name__ == '__main__':
clist = mp.Manager().list()
mp.Process(
target=start_server, args=[clist],
name='udpserver',
).start()
signal.alarm(5) # die in 5 seconds
signal.pause() # wait for control-C or alarm
тестовый забег
(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Если вы не можете использовать менеджер по какой-либо причине, вы также можете реализовать его самостоятельно, который соответствует вашим потребностям.
Мой unittest был настроен так, чтобы останавливать все дочерние процессы, которые остались, если они не были должным образом завершены, как ожидалось, что разрушило диспетчер. Поэтому мне нужно было что-то, что можно было бы произвольно запускать и останавливать, не беспокоясь о тестах.
import multiprocessing
import atexit
import select
class SharedDict:
"""Share a dictionary across processes."""
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
logger.spam('SharedDict got %s', message)
if message[0] == 'stop':
return
if message[0] == 'set':
shared_dict[message[1]] = message[2]
if message[0] == 'get':
self.pipe[0].send(shared_dict.get(message[1]))
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(('stop',))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def __setitem__(self, key, value):
self.pipe[1].send(('set', key, value))
def __getitem__(self, key):
self.pipe[1].send(('get', key))
# to avoid blocking forever if something goes wrong
select.select([self.pipe[1]], [], [], 0.1)
if self.pipe[1].poll():
return self.pipe[1].recv()
return None
def __del__(self):
self._stop()
shared_dict = SharedDict()
Вы можете расширить это с помощью всевозможных методов, и вы можете останавливать и перезапускать его, когда захотите (хотя dict каждый раз будет теряться). Каналы будут оставаться неизменными все время, поэтому все дочерние процессы также могут общаться с перезапущенным менеджером без необходимости в новых файловых файлах каналов.
Я мог бы расширить этот материал, добавив больше функций. Если я пока что не переместил этот класс в отдельный модуль, его можно найти по адресу https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py
Вы можете использовать собственной библиотекой Python.многопроцессорную обработку с
Или это:
import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()
def worker1(l):
l.append(1)
def worker2(l):
l.append(2)
process1 = multiprocessing.Process(
target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
target=worker2, args=[shared_list])
process1.start()
process2.start()
process1.join()
process2.join()
print shared_list