Как решить проблему с потоками в ZeroMQ + Ruby?

Наткнуться на чтение ZeroMQ FAQ о безопасности потоков.

Моя многопоточная программа продолжает зависать в странных местах внутри библиотеки ZeroMQ. Что я делаю неправильно?

Розетки ZeroMQ не являются поточно-ориентированными. Это подробно описано в Руководстве.

Короче говоря, сокеты не должны быть разделены между потоками. Мы рекомендуем создать выделенный сокет для каждого потока.

В тех ситуациях, когда выделенный сокет для каждого потока невозможен, сокет может использоваться совместно, если и только если каждый поток выполняет полный барьер памяти перед доступом к сокету. Большинство языков поддерживают Mutex или Spinlock, которые будут выполнять полный барьер памяти от вашего имени.

Моя многопоточная программа продолжает зависать в странных местах внутри библиотеки ZeroMQ.
Что я делаю неправильно?

Ниже приведен мой следующий код:

Celluloid::ZMQ.init
module Scp
    module DataStore
    class DataSocket
        include Celluloid::ZMQ 
            def pull_socket(socket)
                @read_socket = Socket::Pull.new.tap do |read_socket|
                    ## IPC socket
                    read_socket.connect(socket)
                end
            end

            def push_socket(socket)
                @write_socket = Socket::Push.new.tap do |write_socket|
                    ## IPC socket
                    write_socket.connect(socket)
                end
            end

            def run
                pull_socket and push_socket and loopify!
            end

            def loopify!
                loop {
                   async.evaluate_response(read_socket.read_multipart)
                }
            end

            def evaluate_response(data)
                return_response(message_id,routing,Parser.parser(data))
            end

            def return_response(message_id,routing,object)
                data = object.to_response
                write_socket.send([message_id,routing,data])
            end
        end
    end
end  

DataSocket.new.run 

Теперь есть пара вещей, которые мне неясны:

1) Предполагая, что async порождает новый Thread (каждый раз) и write_socket распределяется между всеми потоками, и ZeroMQ говорит, что их сокет не является потокобезопасным. Я конечно вижу write_socket натыкаясь на проблему безопасности темы.
(Кстати, до сих пор не сталкивался с этой проблемой во всем сквозном тестировании.)

Вопрос 1: Правильно ли мое понимание?

Чтобы решить эту проблему, ZeroMQ просит нас добиться этого, используя Mutex, Semaphore.

Какие результаты в вопросе 2

2) Переключение контекста.

Данное потоковое приложение может переключать контекст в любое время. Глядя на код ffi-rzmq Celluloid::ZMQ.send() внутренне вызывает send_strings (), который внутренне вызывается send_multiple()

Вопрос 2: Переключение контекста может происходить (где угодно) внутри (даже в критической секции) (здесь) [ https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L510%5D

Это также может привести к проблеме с упорядочением данных.

Правильно ли мое следующее наблюдение?

Замечания:

Operating system ( MacOS, Linux and CentOS )  
Ruby - MRI 2.2.2/2.3.0

2 ответа

Решение

Никто не должен рисковать надежностью приложения, помещая его на тонкий лед

Простите эту историю, чтобы она читалась довольно долго, но пожизненный опыт авторов показывает, что причины гораздо важнее, чем любые несколько SLOC (потенциально сомнительных, мистически выглядящих или невежественных) попыток экспериментально найти, как

Начальная нота

В то время как ZeroMQ в течение нескольких десятилетий рекламировался как Zero-Sharing ( Zero-Blocking, (почти)-Zero-Latency и еще несколько макетов дизайна). Лучшее место для чтения о плюсах и минусах - это книги Питера ХИНТЖЕНСА, а не только Невероятная философия "Code Connected, Volume 1", а также философия передового проектирования и проектирования в реальных социальных областях), в самой последней документации по API была представлена ​​и рекламируется некоторая функциональность IMHO с непринужденным отношением к этим краеугольным принципам распределенных вычислений. Дабы не такой резкий свист на Zero-Sharing такой громкий. Тем не менее, я все еще остаюсь парнем с нулевым обменом, поэтому просим вас рассмотреть этот пост в этом свете.

Ответ 1:
Нет, сэр. - или лучше - да и нет, сэр.

ZeroMQ не просит использовать барьеры Mutex/Semaphore. Это противоречит принципам дизайна ZeroMQ.

Да, недавние изменения API начали упоминать, что (при некоторых дополнительных условиях) можно начать использовать общие сокеты... со (многими) дополнительными мерами... так что последствия были обратными. Если кто-то "хочет", он также предпринимает все дополнительные шаги и меры (и оплачивает все изначально скрытые затраты на проектирование и реализацию для того, чтобы "позволить" общим игрушкам (надеюсь) пережить основную (ненужную) битву с остальными неконтролируемая среда распределенной системы, которая, таким образом, внезапно также несет риск неудачи (что по многим разумным причинам не относится к первоначальной евангелизации с нулевым совместным использованием ZeroMQ), поэтому пользователь решает, какой путь выбрать..)

Надежный и надежный дизайн ИМХО все еще лучше развивался в соответствии с первоначальным API ZeroMQ и евангелизацией, где разделение нуля было принципом.

Ответ 2:
В конструкции всегда есть принципиальная неопределенность в отношении упорядочения потоков данных ZeroMQ, одна из максим проекта ZeroMQ не позволяет разработчикам полагаться на неподдерживаемые предположения при упорядочении сообщений и многие другие (применяются исключения). Существует лишь уверенность в том, что любое сообщение, отправленное в инфраструктуру ZeroMQ, либо доставляется как полное сообщение, либо не доставляется вообще. Таким образом, можно быть уверенным только в том факте, что фрагментированные обломки никогда не появляются при доставке. Для более подробной информации читайте ниже.


ThreadId ничего не доказывает (если inproc использовался транспортный класс)

Учитывая внутреннюю конструкцию двигателей ZeroMQ с накачкой данных, создание
zmq.Context( number_of_IO_threads ) решает, сколько потоков будет создано для обработки будущих потоков данных. Это может быть где угодно { 0, 1: default, 2, .. }, вплоть до почти полного исчерпания фиксированного в ядре максимального количества потоков. Значение 0 дает разумный выбор не тратить ресурсы в том случае, если inproc:// транспортный класс на самом деле является обработанной потоком данных областью прямой памяти, которая фактически никогда не течет и не прибивается непосредственно к посадочной площадке приемной абстракции сокета:o)), и никакой поток для такой работы никогда не требуется,
Рядом с этим <aSocket>.setsockopt( zmq.AFFINITY, <anIoThreadEnumID#> ) позволяет точно настроить связанную с данными IO-"гидравлику", чтобы расставить приоритеты, балансировать нагрузку, настроить производительность нагрузки потока на перечисленный пул zmq.Context() -Instance IO-потоки и выгоды от лучших и лучших настроек в перечисленных выше аспектах дизайна и операций с потоками данных.


Краеугольным элементом является Context() пример,
не Socket() один

Когда Context() Экземпляр был создан и сконфигурирован (см. выше, почему и как), он (почти) свободен для совместного использования (если дизайн не может устоять перед совместным использованием или ему необходимо избегать установки полноценных распределенных вычислений). инфраструктура).

Другими словами, мозг всегда находится внутри zmq.Context() экземпляр - все dFSA-движки, связанные с сокетами, настроены / настроены / работают там (да, даже если синтаксис <aSocket>.setsockopt(...) эффект такого реализуется внутри мозга - в соответствующем zmq.Context - не в каком-то проводе от А до Б.

Лучше никогда не делиться <aSocket> (даже если API-4.2.2+ обещает, что вы могли бы)

До сих пор можно было увидеть много фрагментов кода, в которых ZeroMQ Context и его сокеты создаются и удаляются в одно мгновение, обслуживая лишь несколько SLOC-ов подряд, но - это не значит, что такие практика является мудрой или скорректированной в соответствии с любыми другими потребностями, кроме этого очень академического примера (который был сделан просто для того, чтобы быть напечатанным в как можно меньшем количестве SLOC из-за политики издателя книги).

Даже в таких случаях справедливое предупреждение о действительно огромных затратах на zmq.Context Должна присутствовать настройка / демонтаж инфраструктуры, чтобы избежать обобщения, за исключением копий / вставок такого кода, который использовался кратковременно только для таких иллюстративных целей.

Представьте себе реалистичные установки, необходимые для любого Context экземпляр - чтобы подготовить пул соответствующих механизмов dFSA, поддерживая все их соответствующие настройки конфигурации, а также все относящиеся к транспортному классу аппаратные средства, относящиеся к транспортному классу сокетов и конечных точек, + внешние обработчики O/S-услуг, циклические сканеры событий выделение буферных пулов памяти + их динамические распределители и т. д. и т. д. Все это требует как времени, так и ресурсов O / S, поэтому обрабатывайте эти (естественные) затраты разумно и с осторожностью для скорректированных накладных расходов, если производительность не пострадает.

Если вы все еще сомневаетесь, зачем упоминать об этом, просто представьте, если кто-то будет настаивать на том, чтобы разорвать все кабели локальной сети сразу после отправки пакета, и будет вынужден ждать, пока новая кабельная система не будет установлена ​​прямо перед тем, как отправлять следующий пакет. появляется. Надеюсь, что эта точка зрения "разумной инстанциации" теперь может быть лучше воспринята и аргумент, чтобы поделиться (если вообще) zmq.Context() -экземпляры, без каких-либо дальнейших попыток совместного использования экземпляров сокетов ZeroMQ (даже если они сами по себе становятся (почти) поточно-ориентированными).

Философия ZeroMQ является надежной, если принять ее за продвинутый дизайн для высокопроизводительных распределенных вычислительных инфраструктур. Настройка только одного (второстепенного) аспекта, как правило, не корректирует все усилия и затраты, как в глобальном представлении о том, как проектировать безопасные и производительные системы, результат не будет улучшен ни на шаг (и даже абсолютно не подверженный риску риск). бесплатные (если это когда-либо возможно) экземпляры сокетов не изменят этого, тогда как все преимущества звукового дизайна, чистого кода и разумно достижимой возможности тестирования и отладки будут потеряны), если будет изменена только одна эта деталь - так вместо этого потяните другой провод от существующего мозга к такому новому потоку или оснастите новый поток собственным мозгом, который будет локально обрабатывать свои ресурсы и позволять ему подключать собственные провода ко всем другим мозгам - по мере необходимости для связи с - в распределенной системе).

Если вы все еще сомневаетесь, попробуйте представить, что случилось бы с вашей национальной олимпийской хоккейной командой, если бы она разделила всего одну хоккейную клюшку во время турнира. Или как бы вы хотели, если бы все соседи в вашем родном городе использовали один и тот же телефонный номер для ответа на все входящие звонки (да, с одновременным звонком на все телефоны и мобильные телефоны, с одним и тем же номером). Насколько хорошо это будет работать?


Привязки к языку не обязательно должны отражать все доступные API-функции.

Здесь можно поднять, а в некоторых случаях и правильно, что не все языковые привязки ZeroMQ или все популярные оболочки-оболочки хранят все детали API, предоставляемые пользователю для программирования на уровне приложений (автор этого поста долго боролся с такими унаследованными конфликтами, которые оставались неразрешимыми по этой причине, и ему приходилось много чесать голову, чтобы найти какой-либо реальный способ обойти этот факт - так что это (почти) всегда выполнимо


Эпилог:

Справедливо отметить, что в последних версиях ZeroMQ API 4.2.2+ начали распространяться первоначальные принципы евангелизации.

Тем не менее, стоит вспомнить тревожное воспоминание Мори

(ударение добавлено, заглавные буквы нет)

Поток безопасности

ØMQ имеет как резьбонарезной тип сокета, так и не резьбонащищенный тип сокета Приложения НЕ ДОЛЖНЫ использовать небезопасный сокет из нескольких потоков, за исключением случаев миграции сокета из одного потока в другой с барьером памяти "полный забор".

Ниже приведены резьбовые безопасные розетки: * ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

Несмотря на то, что этот текст может показаться многим уместным, создание барьеров для обслуживания - худшее, что можно сделать при разработке современных распределенных вычислительных систем, где производительность является обязательным условием.

Последнее, что хотелось бы увидеть, - это заблокировать собственный код, поскольку такой агент попадает в принципиально неконтролируемое состояние блокировки, откуда никто не может его вывести (ни сам агент внутри, ни кто-либо извне), в случае, если удаленный агент никогда не доставляет ожидаемое событие (которое в распределенных системах может произойти по очень многим причинам или при стольких обстоятельствах, которые находятся вне его контроля).

Построение системы, которая склонна зависать (с широкой улыбкой поддерживаемой (но наивно используемой) синтаксической возможности), на самом деле ничего радостного не делает, тем более серьезная дизайнерская работа.

Также не стоит удивляться тому, что многие дополнительные (изначально не видимые) ограничения применяются по линии новых ходов в использовании shared-{ hockey-stick | телефоны} API:

ZMQ_CLIENT розетки безопасны Они не принимают ZMQ_SNDMORE опция на отправку не ZMQ_RCVMORE на получает. Это ограничивает их данными одной детали. Намерение состоит в том, чтобы расширить API, чтобы разрешить разброс / сбор данных из нескольких частей.

с / а

Celluloid::ZMQ не сообщает ни о одном из этих типов сокетов new-API-(грех совместного использования почти простителен) в своем разделе о поддерживаемых типах сокетов, так что никаких хороших новостей ожидать не стоит априори и Celluloid::ZMQ основная активность, кажется, куда-то исчезла в 2015 году, поэтому ожидания должны быть несколько реалистичными с этой точки зрения.

Тем не менее, одна интересная точка может быть найдена за уведомлением:

прежде чем вы начнете создавать свои собственные распределенные целлулоидные системы с Celluloid::ZMQ, обязательно посмотрите на DCell и решите, подходит ли он вашим целям.


И последнее, но не менее важное: объединение системы цикла событий в другом цикле событий - трудная задача. Попытка интегрировать встроенную систему реального времени в другую систему реального времени может даже математически оказаться невозможной.

Аналогичным образом, построение многоагентной системы с использованием другого агентного компонента приносит дополнительные виды коллизий и условий гонки, если они удовлетворяют тем же ресурсам, которые используются (будь то сознательно или "просто" некоторым функциональным побочным эффектом) от обоих (несколько) агентных основ.

Невосстановимые взаимные тупики - это всего лишь один из видов этих столкновений, которые создают изначально невидимые проблемы по линии неосознанных попыток проектирования. Самый первый шаг за пределы проектирования системы с одним агентом заставляет потерять гораздо больше гарантий, которые были незаметны до того, как перейти к многоагентному (распределенному), так что открывайте умы и будьте готовы к изучению многих "новых" концепций и концентрации. многие новые проблемы, за которыми нужно внимательно следить и за которые нужно бороться, являются довольно важной предпосылкой, чтобы не (неосознанно) вводить шаблоны, которые сейчас фактически являются анти-шаблонами в области распределенных систем (многоагентных).

По крайней мере
Вы были предупреждены
: О)

Этот ответ не является хорошим решением вашей проблемы, и определенно следует тому, что предлагает пользователь 3666197. Я думаю, что это решение имеет потенциал для работы, но также в больших масштабах могут быть затраты производительности из-за перегрузки мьютекса.

Вопрос 1: Предполагая, что async порождает новый поток (каждый раз) и write_socket совместно используется всеми потоками, а zeromq говорит, что их сокет не является поточно-ориентированным. Я конечно вижу, что write_socket сталкивается с проблемой безопасности потоков. (Кстати, до сих пор не сталкивался с этой проблемой во всем сквозном тестировании.) Правильно ли мое понимание этого?

Из моего понимания документации, да, это может быть проблемой, потому что сокеты не являются потокобезопасными. Даже если проблема не возникает, она может появиться позже.

Вопрос 2: Переключение контекста может происходить (где угодно) внутри (даже в критической секции)

Да, таким образом, мы могли бы обойти это, используя мьютекс / семафор, чтобы избежать переключения контекста в неподходящее время.

Я хотел бы сделать что-то вроде этого, но может быть немного лучший подход в зависимости от того, какие вызываемые методы не являются потокобезопасными:

Celluloid::ZMQ.init
module Scp
  module DataStore
    class DataSocket
      include Celluloid::ZMQ

      def initialize
        @mutex = Mutex.new
      end

      def pull_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @read_socket = Socket::Pull.new.tap do |read_socket|
              ## IPC socket
              read_socket.connect(socket)
            end
          end
        end.join
      end

      def push_socket(socket)
        Thread.new do
          @mutex.synchronize do
            @write_socket = Socket::Push.new.tap do |write_socket|
              ## IPC socket
              write_socket.connect(socket)
            end
          end
        end.join
      end

      def run
        # Missing socket arguments here
        pull_socket and push_socket and loopify!
      end

      def loopify!
        Thread.new do
          @mutex.synchronize do
            loop {
              async.evaluate_response(read_socket.read_multipart)
            }
          end
        end.join
      end

      def evaluate_response(data)
        return_response(message_id,routing,Parser.parser(data))
      end

      def return_response(message_id,routing,object)
        data = object.to_response
        write_socket.send([message_id,routing,data])
      end
    end
  end
end

DataSocket.new.run
Другие вопросы по тегам