Поток, работающий в Middleware, использует старую версию родительской переменной экземпляра
Я использовал учебник Heroku для реализации веб-сокетов.
Он работает правильно с Thin, но не работает с Unicorn и Puma.
Также реализовано эхо-сообщение, которое отвечает на сообщение клиента. Он работает правильно на каждом сервере, поэтому нет проблем с реализацией веб-сокетов.
Настройка Redis также правильная (она перехватывает все сообщения и выполняет код внутри subscribe
блок).
Как это работает сейчас:
При запуске сервера пустой @clients
массив инициализируется. Затем запускается новый поток, который прослушивает Redis и предназначен для отправки этого сообщения соответствующему пользователю из массива @clients.
При загрузке страницы создается новое соединение с веб-сокетом, оно сохраняется в массиве @clients.
Если мы получаем сообщение из браузера, мы отправляем его обратно всем клиентам, связанным с одним и тем же пользователем (эта часть работает правильно как в Thin, так и в Puma).
Если мы получаем сообщение от Redis, мы также ищем все пользовательские соединения, хранящиеся в массиве @clients. Вот где происходит странная вещь:
При работе с Thin он находит соединения в массиве @clients и отправляет им сообщение.
При работе с Puma/Unicorn массив @clients всегда пуст, даже если мы пробуем его в таком порядке (без перезагрузки страницы или чего-либо еще):
- Отправить сообщение из браузера ->
@clients.length
1, сообщение доставлено - Отправить сообщение через Redis ->
@clients.length
0, сообщение потеряно - Отправить сообщение из браузера ->
@clients.length
еще 1, сообщение доставлено
- Отправить сообщение из браузера ->
Может кто-нибудь объяснить мне, чего мне не хватает?
Связанный конфиг сервера Puma:
workers 1
threads_count = 1
threads threads_count, threads_count
Связанный код промежуточного программного обеспечения:
require 'faye/websocket'
class NotificationsBackend
def initialize(app)
@app = app
@clients = []
Thread.new do
redis_sub = Redis.new
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
# logging @clients.length from here will always return 0
# [..] retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
# [..] retrieve current user
if user
# add ws connection to @clients array
else
# close ws
end
end
ws.on :message do |event|
# [..] retrieve current user
Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
end
ws.rack_response
else
@app.call(env)
end
end
def send_message user_id, message
# logging @clients.length here will always return correct result
# cs = all connections which belong to that client
cs.each { |c| c.send(message.to_json) }
end
end
2 ответа
Единорог (и, очевидно, пума) оба запускают мастер-процесс, а затем разветвляют одного или нескольких рабочих. форк-копии (или, по крайней мере, представляет иллюзию копирования - фактическая копия обычно происходит только при записи на страницы) всего процесса, но только потока, который вызвал fork
существует в новом процессе.
Очевидно, что ваше приложение инициализируется перед разветвлением - обычно это делается для того, чтобы работники могли быстро начать работу и сэкономить на копировании при записи. Как следствие, ваш поток проверки redis работает только в главном процессе, тогда как @clients
изменяется в дочернем процессе.
Вероятно, вы можете обойти это, либо отложив создание своего потока редиска, либо отключив предварительную загрузку приложения, однако вы должны знать, что ваши настройки не позволят вам масштабироваться за пределы одного рабочего процесса (что в случае с puma и дружественной к потокам JVM, такой как jruby) быть менее ограниченным)
На случай, если кто-то столкнется с той же проблемой, вот два решения, которые я нашел:
1. Отключите предварительную загрузку приложения (это было первое решение, которое я придумал)
Просто удалить preload_app!
из файла puma.rb. Поэтому все темы будут иметь свои @clients
переменная. И они будут доступны другими методами промежуточного программного обеспечения (например, call
так далее.)
Недостаток: вы потеряете все преимущества предварительной загрузки приложения. Это нормально, если у вас есть только 1 или 2 рабочих с несколькими потоками, но если вам нужно их много, лучше предварительно загрузить приложение. Итак, я продолжил свои исследования, и вот еще одно решение:
2. Переместите инициализацию потока из initialize
метод (это то, что я использую сейчас)
Например, я переместил его в call
метод, так вот как выглядит код класса промежуточного программного обеспечения:
attr_accessor :subscriber
def call(env)
@subscriber ||= Thread.new do # if no subscriber present, init new one
redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
redis_sub.subscribe(CHANNEL) do |on|
on.message do |_, msg|
# parsing message code here, retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
# other code from method
end
Оба решения решают одну и ту же проблему: поток прослушивания Redis будет инициализирован для каждого рабочего / потока Puma, а не для основного процесса (который на самом деле не обслуживает запросы).