Massconnect потеря атрибута сокета

Я бегу massconnect.py модуль для имитации открытия многих параллельных подключений через веб-сокет, но когда я пытаюсь открыть 15000 подключений (указано в massconnect json), через некоторое время я получаю следующее сообщение об ошибке:

Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\reactor.py", line 120, in _callEventCallback
    evt.callback(rc, bytes, evt)
  File "C:\Python27\lib\site-packages\twisted\internet\iocpreactor\tcp.py", line 285, in cbConnect
    self.socket.setsockopt(
exceptions.AttributeError: 'Client' object has no attribute 'socket'

Открытие соединений через веб-сокет прерывается, и я получаю это исключение.

Я адаптировал свой massconnect.py модуль для подтверждения подлинности веб-сокета для приложения. Я проверяю, чтобы каждый раз, когда клиент выделялся, он отправлял уникальное сообщение аутентификации.

Я уже поставил MaxUserPort в реестре до 65534. Я думал, что эта проблема может привести к нехватке памяти, но после проверки на разных машинах я понял, что это не причина.

Это может быть известная проблема с питоном / витой?

@Glyph:

Вот исходный код massconnect.py:

'''
Created on Jan 29, 2013

@author: boro.petrovic
'''
###############################################################################
##
##  Copyright 2011,2012 Tavendo GmbH
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

import sys
import time
from twisted.internet import reactor
from twisted.internet import error
from twisted.internet.defer import Deferred, returnValue, inlineCallbacks
from twisted.internet.interfaces import IReactorTime
from autobahn.websocket import connectWS
from autobahn.websocket import WebSocketClientFactory, WebSocketClientProtocol
import time

f='start'

class MassConnectProtocol(WebSocketClientProtocol):   

    def sendHello(self, message):
        #self.sendMessage("Hello from Python!")
        self.sendMessage(payload=message)
        #reactor.callLater(2, self.sendHello(message='test'))

    def onOpen(self):
        global f
        wstoken = f.readline().rstrip()
        message = 'messagecontent'+wstoken
        self.factory.test.onConnected()
        self.sendHello(message)
        self.wstoken = wstoken

    def onMessage(self, msg, binary):
        print "Got message from wstoken " + self.wstoken + " :" + msg

class MassConnectFactory(WebSocketClientFactory):

    def clientConnectionFailed(self, connector, reason):
        if self.test.onFailed():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

    def clientConnectionLost(self, connector, reason):
        if self.test.onLost():
            reactor.callLater(float(self.retrydelay)/1000., connector.connect)

class MassConnect:

    def __init__(self, name, uri, connections, batchsize, batchdelay, retrydelay):
        self.name = name
        self.uri = uri
        self.batchsize = batchsize
        self.batchdelay = batchdelay
        self.retrydelay = retrydelay
        self.failed = 0
        self.lost = 0
        self.targetCnt = connections
        self.currentCnt = 0
        self.actual = 0

    def run(self):
        self.d = Deferred()
        self.started = time.clock()
        self.connectBunch()
        return self.d

    def onFailed(self):
        self.failed += 1
        return True

    def onLost(self):
        self.lost += 1
        return True

    def onConnected(self):
        #sprint "connect"
        self.actual += 1
        if self.actual % self.batchsize == 0:
            sys.stdout.write(".")
        if self.actual == self.targetCnt:
            self.ended = time.clock()
            duration = self.ended - self.started
            print " connected %d clients to %s at %s in %s seconds (retries %d = failed %d + lost %d)" % (self.currentCnt, self.name, self.uri, duration, self.failed + self.lost, self.failed, self.lost)
            try:
                reactor.run()
            except twisted.internet.error.ReactorAlreadyRunning:
                pass

            result = {'name': self.name,
                  'uri': self.uri,
                  'connections': self.targetCnt,
                  'retries': self.failed + self.lost,
                  'lost': self.lost,
                  'failed': self.failed,
                  'duration': duration}
            self.d.callback(result)

    def connectBunch(self):

        if self.currentCnt + self.batchsize < self.targetCnt:
            c = self.batchsize
            redo = True
        else:
            c = self.targetCnt - self.currentCnt
            redo = False
        for i in xrange(0, c):
            factory = MassConnectFactory(self.uri, origin=None, protocols=[("myprotocol")])
            factory.test = self
            factory.retrydelay = self.retrydelay
            factory.protocol = MassConnectProtocol
            factory.setProtocolOptions(version=13)
            factory.setSessionParameters(url="myurl", origin=None, protocols=["myprotocol"])
            connectWS(factory)
            self.currentCnt += 1
        if redo:
            reactor.callLater(float(self.batchdelay)/1000., self.connectBunch)

class MassConnectTest:

    def __init__(self, spec):
        global f
        f = open("C:/tokens.txt", "r")
        self.spec = spec

    @inlineCallbacks
    def run(self):
        global f
        res = []
        for s in self.spec['servers']:
            t = MassConnect(s['name'],
                            s['uri'],
                            self.spec['options']['connections'],
                            self.spec['options']['batchsize'],
                            self.spec['options']['batchdelay'],
                            self.spec['options']['retrydelay'])
            r = yield t.run()
            res.append(r)
        returnValue(res)
        f.close()

В скрипте я заменил конфиденциальные данные фиктивными строками.

Из файла token.txt я читаю токены аутентификации для каждого из 15000 пользователей, подключающихся к тестируемому приложению через веб-сокет. Таким образом, я моделирую соединение множества разных пользователей, и для каждого из них у меня есть активное независимое соединение через веб-сокет.

К сожалению, я не достиг точки, где у меня установлены все 15000 соединений, потому что во время выполнения скрипта (создания соединений) я получил сообщение об ошибке Unhandled Error.

1 ответ

Было бы полезно, если бы вы сократили massconnect.py на SSCCE. Однако, если проблема возникает только на уровне нескольких тысяч одновременных подключений, вероятно, у вас заканчиваются файловые дескрипторы (извините, сообщение об ошибке Twisted для этого невелико).

Посмотрите этот вопрос для получения дополнительной информации об изменении этого параметра в вашей операционной системе.

Другие вопросы по тегам