Как иметь одно соединение Socket.IO и несколько потоковых обновлений RTKQ на основе разных типов событий (каналов) в нескольких компонентах?

Я считаю нормальным использовать одно соединение Socket.IO в браузере, поскольку у меня есть один сервер Socket.IO, который обрабатывает оба канала информации, которые я включаю в сообщения.

Приведенный ниже код работает некорректно, это неверно с логической точки зрения. Я новичок, когда дело касается сложного асинхронного программирования.

Я не получаю ошибок, но поведение кода недостаточно хорошее.

У меня есть этот код, который я пробовал, но он не всегда получает сообщения от сервера, когда должен, и я уверен, что в нем есть некоторые ошибки. Ниже приводится часть моего кода, немного измененная, чтобы я мог опубликовать ее здесь.

Типы

      export interface INotificationsChannelInitParams {
  userIds: string[];
}

export interface ITimersChannelInitParams {
  tids: string[];
}

export interface IGetNotificationsQueryParams {
  authToken: string | null;
  userId: string | null;
  limit?: number;
}

export interface IGetTimersQueryParams {
  tids: string[];
}

Магазин

      let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;

export const getFollowedThingIds = () => {
  return uniq(followedThingIds); // `uniq` is from the `lodash` library
};

const connectHandler = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  const fn = () => {
    if (initAs === "timers") {
      const argument = arg as ITimersChannelInitParams;
      followedThingIds = argument.tids.concat(followedThingIds);
      argument.tids = uniq(followedThingIds);
      myIo!.emit("configure timers channel", argument);
      timersAreSetUp = true;
    }

    if (initAs === "notifications") {
      const argument = arg as INotificationsChannelInitParams;
      followedUserIds = argument.userIds.concat(followedUserIds);
      argument.userIds = followedUserIds;
      myIo!.emit("configure notifications channel", argument);
      notificationsAreSetUp = true;
    }
  };

  lastConnectHandler = fn;

  return fn;
};

let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  if (myIo === null) {
    // TODO: move this replacement set to its own function next to toAbsoluteUrl
    const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");

    myIo = io(wsUrl, { autoConnect: false, port: "8080" });

    myIo.onAny((event, ...args) => {
      // console.log("S.IO", event, args);
    });

    // TODO: use this somewhere so it is not dead code
    let invalidAuthToken = false;

    myIo.on("connect_error", err => {
      if (err.message === "invalid auth token") {
        invalidAuthToken = true;
      }
    });
  }

  console.log('5.16.', store.getState().auth.authToken);
  myIo.auth = { authToken: store.getState().auth.authToken };

  myIo.on("connect", connectHandler(initAs, arg));

  myIo.connect();

  return myIo;
};

export const resetFollowedUserIds = (userIds: string[]) => {
  // followedUserIds = userIds;
  // const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
  // myIo!.emit("configure notifications channel", argument);
  // notificationsAreSetUp = true;
};

// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
  if (myIo === null) {
    return;
  }

  if (uninitAs === "timers") {
    const argument = { tids: [] } as ITimersChannelInitParams;
    myIo.emit("configure timers channel", argument);
    timersAreSetUp = false;
  }

  if (uninitAs === "notifications") {
    const argument = { userIds: [] } as INotificationsChannelInitParams;
    myIo.emit("configure notifications channel", argument);
    notificationsAreSetUp = false;
  }

  // if (!timersAreSetUp && !notificationsAreSetUp) {
  //   myIo.off("connect_error");
  //   myIo.disconnect();
  //   myIo = null;
  // }
};

RTKQ

      getTimers: build.query<
    { [index: string]: number },
    IGetTimersQueryParams
  >({
    query: (params: IGetTimersQueryParams) => ({
      url: `timers?things=` + params.tids.join(","),
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      const myIo = getWebSocketConnection("timers", clone(arg));

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: number[]) => {
        updateCachedData(draft => {
          getFollowedThingIds().forEach((x: string, i: number) => {
            draft[x] = eventData[i];
          });

          // while (draft.length > 0) {
          //   draft.pop();
          // }
          // eventData.forEach((x: number, idx: number) => {
          //   // TODO: cleanup dead timers (<= 0, maybe use a call like
          //   // ws.send(JSON.stringify(arg)))
          //   draft.push(x);
          // });
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("timers", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("timers");

      myIo.off("timers", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  }),
  getNotifications: build.query<
    IDbThing[],
    IGetNotificationsQueryParams
  >({
    query: (params: IGetNotificationsQueryParams) => ({
      url: `notifications?authToken=${params.authToken || ""}&limit=${
        typeof params.limit === "number" ? params.limit : 5
      }&userId=${params.userId || ""}`,
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      // TODO: notifications for not-logged-in users?
      if (arg.userId === null) {
        return;
      }

      // TODO: here keep the previous user ID set up in the notifications
      // channel, besides the new one, and make sure each notification
      // returned by the WS server tells the user to which it should be
      // served
      const myIo = getWebSocketConnection(
        "notifications",
        clone({
          userIds: [arg.userId]
        })
      );

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: IDbNotification) => {
        // if receiving a user notification

        updateCachedData(draft => {
          draft.unshift(eventData);
          if (draft.length > 5) {
            draft.pop();
          }
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("notifications", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("notifications");
      myIo.off("notifications", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  })
})

Пример использования RTKQ

      const getNotificationsQueryParams = useMemo(
  () => ({
    authToken: user?.authToken!,
    userId: params.userId,
    limit: 0,
  }),
  [user, params.userId]
);

const {
  isLoading: notificationsIsLoading,
  isError: notificationsIsError,
  data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
  refetchOnMountOrArgChange: true
});

Обновление 1

Иногда создается второе соединение из браузера, как я вижу на вкладке «Сеть» в DevTools.

Я проверил, и проблема, похоже, связана с сервером Socket.IO: клиент отправляет на сервер сообщение «настроить канал таймеров», но ничего не получает. И сервер отправляет клиенту массив чисел, но ... похоже, не тому клиенту, хотя у меня только один экземпляр приложения открыт на одной вкладке!

Обновление 2

Я получаю подобные сообщения в основном в начале соединения от клиента. Кто они такие?

Обновление 3

Проблема на сервере.

Если я заменю эту строку:

      socket.to(socket.sessionUserId!).emit("timers", rv);

с этой строкой:

      io.of('/').emit("timers", rv);

он отправляет результат вычислений всем клиентам, включая того, которого я вижу, из которых я думаю, что это единственный клиент. В противном случае сообщение не доходит до клиента, с которым я тестирую. Я не знаю почему, потому что я проверил socket.rooms и розетка находится в комнате, обозначенной значком socket.sessionUserId и я вызываю это для каждого сокета на сервере:

      io.on("connection", (normalSocket: Socket) => {
  // a user connected

  const socket = normalSocket as SocketData;

  // join the room named by the user ID that started the socket
  socket.join(socket.sessionUserId!);

...

1 ответ

Вместо этого:

      socket.to(socket.sessionUserId!).emit("timers", rv);

или это:

      io.of('/').emit("timers", rv);

Мне нужно использовать только это:

      io.to(socket.sessionUserId!).emit("timers", rv);

Соответствующая документация находится здесь.

Другие вопросы по тегам