Как убедиться, что данные через сокеты UNX отправляются в порядке с использованием Twisted Python

С моей текущей настройкой я запускаю сервер с Django и я пытаюсь автоматизировать резервное копирование в облако при каждом действии POST/PUT. Чтобы обойти задержку (пинг до сервера колеблется около 100 мс, а действие может достигать более 10 отправленных элементов одновременно), я решил создать отдельную сущность с requests клиент и просто иметь эту ручку для всех функций резервного копирования.

Для этого у меня есть объект прослушивания через UNX с помощью twisted и я посылаю ему строку через нее всякий раз, когда сталкиваюсь с конечной точкой. Проблема, однако, заключается в том, что, если слишком много конечных точек вызывается одновременно или вызывается в быстрой последовательности, данные, отправляемые через сокет, перестают упорядочиваться. Есть ли способ предотвратить это? Код ниже:

UNX сервер:

class BaseUNXServerProtocol(LineOnlyReceiver):

    rest_client = RestClient()

    def connectionMade(self):
        print("UNIX Client connected!")

    def lineReceived(self, line):
        print("Line Received!")

    def dataReceived(self, data):
        string = data.decode("utf-8")
        jstring = json.loads(data)
        if jstring['command'] == "upload_object":
            self.rest_client.upload(jstring['model_name'], jstring['model_id'])

Unix Client:

class BaseUnixClient(object):

path = BRANCH_UNX_PATH
connected = False

def __init__(self):
    self.init_vars()
    self.connect()

def connect(self):
    if os.path.exists(self.path):
        self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.client.connect(self.path)
        self.connected = True
    else:
        print("Could not connect to path: {}".format(self.path))

def call_to_upload(self, model_class, model_id, upload_type):
        self.send_string(_messages.branch_upload_message(model_class, model_id, upload_type))

Конечная точка perform_create: (По сути, ловушка, которая вызывается всякий раз, когда новый объект помещается в POST)

def perform_create(self, serializer):
    instance = serializer.save()

    # Call for upload/notify
    UnixClient().call_to_upload(model_class=type(instance).__name__, model_id=instance.id, upload_type="create")

1 ответ

SOCK_STREAM соединения всегда заказываются. Данные об одном соединении поступают в том же порядке, в котором они были (или обрывы соединения).

Единственная очевидная проблема с кодом, которым вы поделились, заключается в том, что вы не должны переопределять dataReceived на LineOnlyReceiver подкласс. Вся ваша логика принадлежит lineReceived,

Это не вызовет проблем с данными не по порядку, но может привести к проблемам с кадрированием (например, при обработке частичных сообщений JSON или при комбинировании нескольких сообщений), которые могут вызвать json.loads поднять исключение.

Итак, чтобы ответить на ваш вопрос: данные доставляются в порядке. Если вы видите неупорядоченную операцию, это потому, что данные отправляются в другом порядке, чем вы ожидаете, или потому, что существует расхождение между порядком доставки данных и порядком наблюдаемых побочных эффектов. Я не вижу способа поставить дополнительную диагностику, не увидев больше вашего кода.

Видя код отправки, проблема в том, что вы используете новое соединение для каждого perform_create операция. Там нет гарантии о доставке заказа через разные соединения. Даже если ваша программа делает:

  • установить соединение
  • отправить данные по соединению
  • установить соединение б
  • отправить данные по соединению b
  • тесное соединение
  • закрыть соединение б

Получатель может принять решение обработать данные о соединении b перед данными о соединении a. Это потому, что базовая система уведомления о событиях (select, epoll_waitи т. д.), насколько я знаю, не хранит информацию о порядке событий, о которых он сообщает. Вместо этого результаты появляются в псевдослучайном порядке или в скучном детерминированном порядке (например, по возрастанию по номеру дескриптора файла).

Чтобы исправить проблему заказа, сделайте один UnixClient и использовать его для всех ваших perform_create звонки.

Другие вопросы по тегам