Каналы Phoenix - несколько каналов на сокет

Я пишу приложение, использующее Elixir Channels для обработки событий в реальном времени. Я понимаю, что для каждого клиента будет открыт 1 сокет, и он может мультиплексировать несколько каналов. Итак, мое приложение представляет собой приложение чата, где пользователи являются частью нескольких групповых чатов. У меня есть 1 канал Phoenix под названием MessageChannel, где метод соединения будет обрабатывать темы, связанные с дианамией.

def join("groups:" <> group_id, payload, socket) do
....

Допустим, Джон присоединяется к группам / темам A и B, а Боб присоединяется только к группе / темам B. Когда Джон отправляет сообщение группе / теме A, широковещательная рассылка!/3 также отправляет это сообщение Бобу слишком правильно? Поскольку handle_in не имеет контекста, в какую тему / группу было отправлено сообщение.

Как бы я справился с этим, чтобы Боб не получил события, отправленные в группу А. Правильно ли я проектирую это?

3 ответа

Решение

Так как handle_in не имеет контекста, в какую тему / группу было отправлено сообщение.

когда Phoenix.Channel.broadcast/3 называется, по-видимому , имеет тему, связанную с сообщением (что не очевидно из подписи). Вы можете увидеть код, начинающийся с этой строки channel.ex:

def broadcast(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast pubsub_server, topic, event, message
end

Поэтому, когда звонок broadcast/3 выполняется с использованием сокета, шаблон соответствует текущей теме, а затем вызывает основной Server.broadcast/4,

(Если вам любопытно, как я, это, в свою очередь, делает звонок в основной PubSub.broadcast/3 который делает некоторую магию распространения для маршрутизации вызова на ваш сконфигурированный сервер реализации pubsub, скорее всего с использованием pg2, но я отвлекся...)

Итак, я обнаружил, что это поведение не очевидно из чтения Phoenix.Channel документы, но они явно указывают это на странице каналов phoenixframework в Incoming Events:

broadcast!/3 уведомит всех присоединившихся клиентов по теме этого сокета и вызовет их handle_out/3 Обратные вызовы.

Так что это только транслируется "на тему этого сокета". Они определяют тему на той же странице как:

topic - Строковая тема или тема: пространство имен подтемных пар, например "messages", "messages:123"

Таким образом, в вашем примере "themes" - это на самом деле тема: строки пространства имен субтопических пар: "groups:A" а также "groups:B", Джон должен был бы подписаться на обе эти темы по отдельности на клиенте, поэтому у вас фактически будут ссылки на два разных канала, даже если они используют один и тот же сокет. Итак, если вы используете клиент javascript, создание канала выглядит примерно так:

let channelA = this.socket.channel("groups:A", {});
let channelB = this.socket.channel("groups:B", {});

Затем, когда вы отправляете сообщение на канал от клиента, вы используете только тот канал, у которого есть тема, для которой шаблон сопоставляется с сервером, как мы видели выше.

channelA.push(msgName, msgBody);

На самом деле, маршрутизация сокетов основана на том, как определить ваши темы в ваших проектах. Модуль Socket с помощью channel API. Для моего клона Slack я использую три канала. У меня есть канал системного уровня для управления обновлением присутствия, канал пользователя и канал комнаты.

Любой данный пользователь подписан на 0 или 1 канал. Тем не менее, пользователи могут быть подписаны на несколько каналов.

Для сообщений, поступающих в определенную комнату, я транслирую их по каналу комнаты.

Когда я обнаруживаю непрочитанные сообщения, уведомления или значки для определенной комнаты, я использую канал пользователя. Каждый канал пользователя хранит список комнат, на которые подписан пользователь (они перечислены на боковой панели клиента).

Хитрость во всем этом заключается в использовании нескольких API каналов, в основном intercept, handle_out, My.Endpoint.subscribe, а также handle_info(%Broadcast{},socket),

  • я использую intercept ловить передаваемые сообщения, которые я хочу либо игнорировать, либо манипулировать ими перед отправкой.
  • В канале пользователя я подписываюсь на трансляции событий из канала комнаты
  • Когда вы подписываетесь, вы получаете handle_info позвонить с %Broadcast{} структура, которая включает тему, событие и полезную нагрузку передаваемого сообщения.

Вот пара частей моего кода:

defmodule UcxChat.UserSocket do
  use Phoenix.Socket
  alias UcxChat.{User, Repo, MessageService, SideNavService}
  require UcxChat.ChatConstants, as: CC

  ## Channels
  channel CC.chan_room <> "*", UcxChat.RoomChannel    # "ucxchat:"
  channel CC.chan_user <> "*", UcxChat.UserChannel  # "user:"
  channel CC.chan_system <> "*", UcxChat.SystemChannel  # "system:"
  # ...
end

# user_channel.ex
 # ...
 intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
 #...
 def handle_out("room:join", msg, socket) do
    %{room: room} = msg
    UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
    update_rooms_list(socket)
    clear_unreads(room, socket)
    {:noreply, subscribe([room], socket)}
  end
  def handle_out("room:leave" = ev, msg, socket) do
    %{room: room} = msg
    debug ev, msg, "assigns: #{inspect socket.assigns}"
    socket.endpoint.unsubscribe(CC.chan_room <> room)
    update_rooms_list(socket)
    {:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
  end

  # ...
  defp subscribe(channels, socket) do
    # debug inspect(channels), ""
    Enum.reduce channels, socket, fn channel, acc ->
      subscribed = acc.assigns[:subscribed]
      if channel in subscribed do
        acc
      else
        socket.endpoint.subscribe(CC.chan_room <> channel)
        assign(acc, :subscribed, [channel | subscribed])
      end
    end
  end
  # ...
end

Я также использую user_channel для всех событий, связанных с конкретным пользователем, таких как состояние клиента, сообщения об ошибках и т. Д.

Отказ от ответственности: я не смотрел на внутреннюю работу канала, эта информация полностью из моего первого опыта использования каналов в приложении.

Когда кто-то присоединяется к другой группе (в зависимости от join/3), устанавливается соединение по отдельному каналу (разъему). Таким образом, вещание на А не будет отправлять сообщения членам Б, только А.

Мне кажется модуль канала похож на GenServer и соединение чем-то похоже start_linkгде новый сервер (процесс) запускается (однако, только если он еще не существует).

Вы действительно можете игнорировать внутреннюю работу модуля и просто понимать, что если вы присоединяетесь к каналу с именем, отличным от уже существующих, вы присоединяетесь к уникальному каналу. Вы также можете просто поверить, что если вы вещаете на канал, сообщение получат только участники этого канала.

Например, в моем приложении у меня есть пользовательский канал, к которому я хочу подключить только одного пользователя. Объединение выглядит как def join("agent:" <> _agent, payload, socket) где агент просто адрес электронной почты. Когда я передаю сообщение на этот канал, только один агент получает сообщение. У меня также есть служебный канал, к которому присоединяются все агенты, и я транслирую на него, когда хочу, чтобы все агенты получили сообщение.

Надеюсь это поможет.

Другие вопросы по тегам