Поток, работающий в Middleware, использует старую версию родительской переменной экземпляра

Я использовал учебник Heroku для реализации веб-сокетов.

Он работает правильно с Thin, но не работает с Unicorn и Puma.

Также реализовано эхо-сообщение, которое отвечает на сообщение клиента. Он работает правильно на каждом сервере, поэтому нет проблем с реализацией веб-сокетов.

Настройка Redis также правильная (она перехватывает все сообщения и выполняет код внутри subscribe блок).

Как это работает сейчас:

При запуске сервера пустой @clients массив инициализируется. Затем запускается новый поток, который прослушивает Redis и предназначен для отправки этого сообщения соответствующему пользователю из массива @clients.

При загрузке страницы создается новое соединение с веб-сокетом, оно сохраняется в массиве @clients.

Если мы получаем сообщение из браузера, мы отправляем его обратно всем клиентам, связанным с одним и тем же пользователем (эта часть работает правильно как в Thin, так и в Puma).

Если мы получаем сообщение от Redis, мы также ищем все пользовательские соединения, хранящиеся в массиве @clients. Вот где происходит странная вещь:

  • При работе с Thin он находит соединения в массиве @clients и отправляет им сообщение.

  • При работе с Puma/Unicorn массив @clients всегда пуст, даже если мы пробуем его в таком порядке (без перезагрузки страницы или чего-либо еще):

    1. Отправить сообщение из браузера -> @clients.length 1, сообщение доставлено
    2. Отправить сообщение через Redis -> @clients.length 0, сообщение потеряно
    3. Отправить сообщение из браузера -> @clients.length еще 1, сообщение доставлено

Может кто-нибудь объяснить мне, чего мне не хватает?

Связанный конфиг сервера Puma:

workers 1
threads_count = 1
threads threads_count, threads_count

Связанный код промежуточного программного обеспечения:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end

2 ответа

Решение

Единорог (и, очевидно, пума) оба запускают мастер-процесс, а затем разветвляют одного или нескольких рабочих. форк-копии (или, по крайней мере, представляет иллюзию копирования - фактическая копия обычно происходит только при записи на страницы) всего процесса, но только потока, который вызвал fork существует в новом процессе.

Очевидно, что ваше приложение инициализируется перед разветвлением - обычно это делается для того, чтобы работники могли быстро начать работу и сэкономить на копировании при записи. Как следствие, ваш поток проверки redis работает только в главном процессе, тогда как @clients изменяется в дочернем процессе.

Вероятно, вы можете обойти это, либо отложив создание своего потока редиска, либо отключив предварительную загрузку приложения, однако вы должны знать, что ваши настройки не позволят вам масштабироваться за пределы одного рабочего процесса (что в случае с puma и дружественной к потокам JVM, такой как jruby) быть менее ограниченным)

На случай, если кто-то столкнется с той же проблемой, вот два решения, которые я нашел:

1. Отключите предварительную загрузку приложения (это было первое решение, которое я придумал)

Просто удалить preload_app! из файла puma.rb. Поэтому все темы будут иметь свои @clients переменная. И они будут доступны другими методами промежуточного программного обеспечения (например, call так далее.)

Недостаток: вы потеряете все преимущества предварительной загрузки приложения. Это нормально, если у вас есть только 1 или 2 рабочих с несколькими потоками, но если вам нужно их много, лучше предварительно загрузить приложение. Итак, я продолжил свои исследования, и вот еще одно решение:

2. Переместите инициализацию потока из initialize метод (это то, что я использую сейчас)

Например, я переместил его в call метод, так вот как выглядит код класса промежуточного программного обеспечения:

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

Оба решения решают одну и ту же проблему: поток прослушивания Redis будет инициализирован для каждого рабочего / потока Puma, а не для основного процесса (который на самом деле не обслуживает запросы).

Другие вопросы по тегам