Как иметь одно соединение Socket.IO и несколько потоковых обновлений RTKQ на основе разных типов событий (каналов) в нескольких компонентах?
Я считаю нормальным использовать одно соединение Socket.IO в браузере, поскольку у меня есть один сервер Socket.IO, который обрабатывает оба канала информации, которые я включаю в сообщения.
Приведенный ниже код работает некорректно, это неверно с логической точки зрения. Я новичок, когда дело касается сложного асинхронного программирования.
Я не получаю ошибок, но поведение кода недостаточно хорошее.
У меня есть этот код, который я пробовал, но он не всегда получает сообщения от сервера, когда должен, и я уверен, что в нем есть некоторые ошибки. Ниже приводится часть моего кода, немного измененная, чтобы я мог опубликовать ее здесь.
Типы
export interface INotificationsChannelInitParams {
userIds: string[];
}
export interface ITimersChannelInitParams {
tids: string[];
}
export interface IGetNotificationsQueryParams {
authToken: string | null;
userId: string | null;
limit?: number;
}
export interface IGetTimersQueryParams {
tids: string[];
}
Магазин
let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;
export const getFollowedThingIds = () => {
return uniq(followedThingIds); // `uniq` is from the `lodash` library
};
const connectHandler = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
const fn = () => {
if (initAs === "timers") {
const argument = arg as ITimersChannelInitParams;
followedThingIds = argument.tids.concat(followedThingIds);
argument.tids = uniq(followedThingIds);
myIo!.emit("configure timers channel", argument);
timersAreSetUp = true;
}
if (initAs === "notifications") {
const argument = arg as INotificationsChannelInitParams;
followedUserIds = argument.userIds.concat(followedUserIds);
argument.userIds = followedUserIds;
myIo!.emit("configure notifications channel", argument);
notificationsAreSetUp = true;
}
};
lastConnectHandler = fn;
return fn;
};
let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
if (myIo === null) {
// TODO: move this replacement set to its own function next to toAbsoluteUrl
const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");
myIo = io(wsUrl, { autoConnect: false, port: "8080" });
myIo.onAny((event, ...args) => {
// console.log("S.IO", event, args);
});
// TODO: use this somewhere so it is not dead code
let invalidAuthToken = false;
myIo.on("connect_error", err => {
if (err.message === "invalid auth token") {
invalidAuthToken = true;
}
});
}
console.log('5.16.', store.getState().auth.authToken);
myIo.auth = { authToken: store.getState().auth.authToken };
myIo.on("connect", connectHandler(initAs, arg));
myIo.connect();
return myIo;
};
export const resetFollowedUserIds = (userIds: string[]) => {
// followedUserIds = userIds;
// const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
// myIo!.emit("configure notifications channel", argument);
// notificationsAreSetUp = true;
};
// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
if (myIo === null) {
return;
}
if (uninitAs === "timers") {
const argument = { tids: [] } as ITimersChannelInitParams;
myIo.emit("configure timers channel", argument);
timersAreSetUp = false;
}
if (uninitAs === "notifications") {
const argument = { userIds: [] } as INotificationsChannelInitParams;
myIo.emit("configure notifications channel", argument);
notificationsAreSetUp = false;
}
// if (!timersAreSetUp && !notificationsAreSetUp) {
// myIo.off("connect_error");
// myIo.disconnect();
// myIo = null;
// }
};
RTKQ
getTimers: build.query<
{ [index: string]: number },
IGetTimersQueryParams
>({
query: (params: IGetTimersQueryParams) => ({
url: `timers?things=` + params.tids.join(","),
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const myIo = getWebSocketConnection("timers", clone(arg));
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: number[]) => {
updateCachedData(draft => {
getFollowedThingIds().forEach((x: string, i: number) => {
draft[x] = eventData[i];
});
// while (draft.length > 0) {
// draft.pop();
// }
// eventData.forEach((x: number, idx: number) => {
// // TODO: cleanup dead timers (<= 0, maybe use a call like
// // ws.send(JSON.stringify(arg)))
// draft.push(x);
// });
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("timers", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("timers");
myIo.off("timers", listener);
myIo.off("connect", lastConnectHandler as any);
}
}),
getNotifications: build.query<
IDbThing[],
IGetNotificationsQueryParams
>({
query: (params: IGetNotificationsQueryParams) => ({
url: `notifications?authToken=${params.authToken || ""}&limit=${
typeof params.limit === "number" ? params.limit : 5
}&userId=${params.userId || ""}`,
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// TODO: notifications for not-logged-in users?
if (arg.userId === null) {
return;
}
// TODO: here keep the previous user ID set up in the notifications
// channel, besides the new one, and make sure each notification
// returned by the WS server tells the user to which it should be
// served
const myIo = getWebSocketConnection(
"notifications",
clone({
userIds: [arg.userId]
})
);
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: IDbNotification) => {
// if receiving a user notification
updateCachedData(draft => {
draft.unshift(eventData);
if (draft.length > 5) {
draft.pop();
}
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("notifications", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("notifications");
myIo.off("notifications", listener);
myIo.off("connect", lastConnectHandler as any);
}
})
})
Пример использования RTKQ
const getNotificationsQueryParams = useMemo(
() => ({
authToken: user?.authToken!,
userId: params.userId,
limit: 0,
}),
[user, params.userId]
);
const {
isLoading: notificationsIsLoading,
isError: notificationsIsError,
data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
refetchOnMountOrArgChange: true
});
Обновление 1
Иногда создается второе соединение из браузера, как я вижу на вкладке «Сеть» в DevTools.
Я проверил, и проблема, похоже, связана с сервером Socket.IO: клиент отправляет на сервер сообщение «настроить канал таймеров», но ничего не получает. И сервер отправляет клиенту массив чисел, но ... похоже, не тому клиенту, хотя у меня только один экземпляр приложения открыт на одной вкладке!
Обновление 2
Я получаю подобные сообщения в основном в начале соединения от клиента. Кто они такие?
Обновление 3
Проблема на сервере.
Если я заменю эту строку:
socket.to(socket.sessionUserId!).emit("timers", rv);
с этой строкой:
io.of('/').emit("timers", rv);
он отправляет результат вычислений всем клиентам, включая того, которого я вижу, из которых я думаю, что это единственный клиент. В противном случае сообщение не доходит до клиента, с которым я тестирую. Я не знаю почему, потому что я проверил
socket.rooms
и розетка находится в комнате, обозначенной значком
socket.sessionUserId
и я вызываю это для каждого сокета на сервере:
io.on("connection", (normalSocket: Socket) => {
// a user connected
const socket = normalSocket as SocketData;
// join the room named by the user ID that started the socket
socket.join(socket.sessionUserId!);
...
1 ответ
Вместо этого:
socket.to(socket.sessionUserId!).emit("timers", rv);
или это:
io.of('/').emit("timers", rv);
Мне нужно использовать только это:
io.to(socket.sessionUserId!).emit("timers", rv);
Соответствующая документация находится здесь.