如何根据多个组件中的不同事件类型(通道)建立单个 Socket.IO 连接和多个流式更新 RTKQ?
How to have a single Socket.IO connection and multiple streaming-updates RTKQ's based on different event types (channels) in multiple components?
我认为在浏览器中使用单个 Socket.IO 连接是正常的,因为我有一个 Socket.IO 服务器来处理消息中包含的两个信息通道。
下面的代码运行不正常,逻辑上不正确。对于复杂的异步编程,我是初学者。
我没有收到错误,但代码的行为不够好。
我试过这段代码,但它并不总是在应该的时候从服务器接收消息,我确信它有一些错误。以下是我的代码的一部分,稍微修改了一下,以便我可以 post 这里。
类型
export interface INotificationsChannelInitParams {
userIds: string[];
}
export interface ITimersChannelInitParams {
tids: string[];
}
export interface IGetNotificationsQueryParams {
authToken: string | null;
userId: string | null;
limit?: number;
}
export interface IGetTimersQueryParams {
tids: string[];
}
店铺
let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;
export const getFollowedThingIds = () => {
return uniq(followedThingIds); // `uniq` is from the `lodash` library
};
const connectHandler = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
const fn = () => {
if (initAs === "timers") {
const argument = arg as ITimersChannelInitParams;
followedThingIds = argument.tids.concat(followedThingIds);
argument.tids = uniq(followedThingIds);
myIo!.emit("configure timers channel", argument);
timersAreSetUp = true;
}
if (initAs === "notifications") {
const argument = arg as INotificationsChannelInitParams;
followedUserIds = argument.userIds.concat(followedUserIds);
argument.userIds = followedUserIds;
myIo!.emit("configure notifications channel", argument);
notificationsAreSetUp = true;
}
};
lastConnectHandler = fn;
return fn;
};
let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
if (myIo === null) {
// TODO: move this replacement set to its own function next to toAbsoluteUrl
const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");
myIo = io(wsUrl, { autoConnect: false, port: "8080" });
myIo.onAny((event, ...args) => {
// console.log("S.IO", event, args);
});
// TODO: use this somewhere so it is not dead code
let invalidAuthToken = false;
myIo.on("connect_error", err => {
if (err.message === "invalid auth token") {
invalidAuthToken = true;
}
});
}
console.log('5.16.', store.getState().auth.authToken);
myIo.auth = { authToken: store.getState().auth.authToken };
myIo.on("connect", connectHandler(initAs, arg));
myIo.connect();
return myIo;
};
export const resetFollowedUserIds = (userIds: string[]) => {
// followedUserIds = userIds;
// const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
// myIo!.emit("configure notifications channel", argument);
// notificationsAreSetUp = true;
};
// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
if (myIo === null) {
return;
}
if (uninitAs === "timers") {
const argument = { tids: [] } as ITimersChannelInitParams;
myIo.emit("configure timers channel", argument);
timersAreSetUp = false;
}
if (uninitAs === "notifications") {
const argument = { userIds: [] } as INotificationsChannelInitParams;
myIo.emit("configure notifications channel", argument);
notificationsAreSetUp = false;
}
// if (!timersAreSetUp && !notificationsAreSetUp) {
// myIo.off("connect_error");
// myIo.disconnect();
// myIo = null;
// }
};
RTKQ 的
getTimers: build.query<
{ [index: string]: number },
IGetTimersQueryParams
>({
query: (params: IGetTimersQueryParams) => ({
url: `timers?things=` + params.tids.join(","),
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const myIo = getWebSocketConnection("timers", clone(arg));
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: number[]) => {
updateCachedData(draft => {
getFollowedThingIds().forEach((x: string, i: number) => {
draft[x] = eventData[i];
});
// while (draft.length > 0) {
// draft.pop();
// }
// eventData.forEach((x: number, idx: number) => {
// // TODO: cleanup dead timers (<= 0, maybe use a call like
// // ws.send(JSON.stringify(arg)))
// draft.push(x);
// });
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("timers", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("timers");
myIo.off("timers", listener);
myIo.off("connect", lastConnectHandler as any);
}
}),
getNotifications: build.query<
IDbThing[],
IGetNotificationsQueryParams
>({
query: (params: IGetNotificationsQueryParams) => ({
url: `notifications?authToken=${params.authToken || ""}&limit=${
typeof params.limit === "number" ? params.limit : 5
}&userId=${params.userId || ""}`,
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// TODO: notifications for not-logged-in users?
if (arg.userId === null) {
return;
}
// TODO: here keep the previous user ID set up in the notifications
// channel, besides the new one, and make sure each notification
// returned by the WS server tells the user to which it should be
// served
const myIo = getWebSocketConnection(
"notifications",
clone({
userIds: [arg.userId]
})
);
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: IDbNotification) => {
// if receiving a user notification
updateCachedData(draft => {
draft.unshift(eventData);
if (draft.length > 5) {
draft.pop();
}
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("notifications", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("notifications");
myIo.off("notifications", listener);
myIo.off("connect", lastConnectHandler as any);
}
})
})
RTKQ 的用法示例
const getNotificationsQueryParams = useMemo(
() => ({
authToken: user?.authToken!,
userId: params.userId,
limit: 0,
}),
[user, params.userId]
);
const {
isLoading: notificationsIsLoading,
isError: notificationsIsError,
data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
refetchOnMountOrArgChange: true
});
更新 1
有时会创建来自浏览器的第二个连接,正如我在 DevTools 的“网络”选项卡中看到的那样。
我已经检查过,问题似乎出在 Socket.IO 服务器上:客户端向服务器发送“配置计时器通道”消息,但它什么也没收到。服务器向客户端发送了一组数字,但是...似乎发送给了错误的客户端,尽管我在一个选项卡中只打开了一个应用程序实例!
更新 2
我主要在客户端连接开始时收到这样的消息。它们是什么?
更新 3
服务器有问题。
如果我替换这一行:
socket.to(socket.sessionUserId!).emit("timers", rv);
这一行:
io.of('/').emit("timers", rv);
它将计算结果发送给所有客户端,包括我能看到的那个,我认为它是唯一的客户端。如果不是,则消息不会到达我正在测试的客户端。我不知道为什么,因为我检查了 socket.rooms
并且套接字在 socket.sessionUserId
标识的房间中,我为服务器上的每个套接字都调用了它:
io.on("connection", (normalSocket: Socket) => {
// a user connected
const socket = normalSocket as SocketData;
// join the room named by the user ID that started the socket
socket.join(socket.sessionUserId!);
...
而不是这个:
socket.to(socket.sessionUserId!).emit("timers", rv);
或者这个:
io.of('/').emit("timers", rv);
我只需要用这个:
io.to(socket.sessionUserId!).emit("timers", rv);
相关文档是here。
我认为在浏览器中使用单个 Socket.IO 连接是正常的,因为我有一个 Socket.IO 服务器来处理消息中包含的两个信息通道。
下面的代码运行不正常,逻辑上不正确。对于复杂的异步编程,我是初学者。
我没有收到错误,但代码的行为不够好。
我试过这段代码,但它并不总是在应该的时候从服务器接收消息,我确信它有一些错误。以下是我的代码的一部分,稍微修改了一下,以便我可以 post 这里。
类型
export interface INotificationsChannelInitParams {
userIds: string[];
}
export interface ITimersChannelInitParams {
tids: string[];
}
export interface IGetNotificationsQueryParams {
authToken: string | null;
userId: string | null;
limit?: number;
}
export interface IGetTimersQueryParams {
tids: string[];
}
店铺
let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;
export const getFollowedThingIds = () => {
return uniq(followedThingIds); // `uniq` is from the `lodash` library
};
const connectHandler = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
const fn = () => {
if (initAs === "timers") {
const argument = arg as ITimersChannelInitParams;
followedThingIds = argument.tids.concat(followedThingIds);
argument.tids = uniq(followedThingIds);
myIo!.emit("configure timers channel", argument);
timersAreSetUp = true;
}
if (initAs === "notifications") {
const argument = arg as INotificationsChannelInitParams;
followedUserIds = argument.userIds.concat(followedUserIds);
argument.userIds = followedUserIds;
myIo!.emit("configure notifications channel", argument);
notificationsAreSetUp = true;
}
};
lastConnectHandler = fn;
return fn;
};
let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
initAs: "timers" | "notifications",
arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
if (myIo === null) {
// TODO: move this replacement set to its own function next to toAbsoluteUrl
const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");
myIo = io(wsUrl, { autoConnect: false, port: "8080" });
myIo.onAny((event, ...args) => {
// console.log("S.IO", event, args);
});
// TODO: use this somewhere so it is not dead code
let invalidAuthToken = false;
myIo.on("connect_error", err => {
if (err.message === "invalid auth token") {
invalidAuthToken = true;
}
});
}
console.log('5.16.', store.getState().auth.authToken);
myIo.auth = { authToken: store.getState().auth.authToken };
myIo.on("connect", connectHandler(initAs, arg));
myIo.connect();
return myIo;
};
export const resetFollowedUserIds = (userIds: string[]) => {
// followedUserIds = userIds;
// const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
// myIo!.emit("configure notifications channel", argument);
// notificationsAreSetUp = true;
};
// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
if (myIo === null) {
return;
}
if (uninitAs === "timers") {
const argument = { tids: [] } as ITimersChannelInitParams;
myIo.emit("configure timers channel", argument);
timersAreSetUp = false;
}
if (uninitAs === "notifications") {
const argument = { userIds: [] } as INotificationsChannelInitParams;
myIo.emit("configure notifications channel", argument);
notificationsAreSetUp = false;
}
// if (!timersAreSetUp && !notificationsAreSetUp) {
// myIo.off("connect_error");
// myIo.disconnect();
// myIo = null;
// }
};
RTKQ 的
getTimers: build.query<
{ [index: string]: number },
IGetTimersQueryParams
>({
query: (params: IGetTimersQueryParams) => ({
url: `timers?things=` + params.tids.join(","),
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const myIo = getWebSocketConnection("timers", clone(arg));
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: number[]) => {
updateCachedData(draft => {
getFollowedThingIds().forEach((x: string, i: number) => {
draft[x] = eventData[i];
});
// while (draft.length > 0) {
// draft.pop();
// }
// eventData.forEach((x: number, idx: number) => {
// // TODO: cleanup dead timers (<= 0, maybe use a call like
// // ws.send(JSON.stringify(arg)))
// draft.push(x);
// });
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("timers", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("timers");
myIo.off("timers", listener);
myIo.off("connect", lastConnectHandler as any);
}
}),
getNotifications: build.query<
IDbThing[],
IGetNotificationsQueryParams
>({
query: (params: IGetNotificationsQueryParams) => ({
url: `notifications?authToken=${params.authToken || ""}&limit=${
typeof params.limit === "number" ? params.limit : 5
}&userId=${params.userId || ""}`,
method: "GET"
}),
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// TODO: notifications for not-logged-in users?
if (arg.userId === null) {
return;
}
// TODO: here keep the previous user ID set up in the notifications
// channel, besides the new one, and make sure each notification
// returned by the WS server tells the user to which it should be
// served
const myIo = getWebSocketConnection(
"notifications",
clone({
userIds: [arg.userId]
})
);
// when data is received from the socket connection to the server,
// if it is a valid JSON object, update our query result with the
// received message
const listener = (eventData: IDbNotification) => {
// if receiving a user notification
updateCachedData(draft => {
draft.unshift(eventData);
if (draft.length > 5) {
draft.pop();
}
});
};
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded;
myIo.on("notifications", listener);
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved;
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
// closeWebSocketConnection("notifications");
myIo.off("notifications", listener);
myIo.off("connect", lastConnectHandler as any);
}
})
})
RTKQ 的用法示例
const getNotificationsQueryParams = useMemo(
() => ({
authToken: user?.authToken!,
userId: params.userId,
limit: 0,
}),
[user, params.userId]
);
const {
isLoading: notificationsIsLoading,
isError: notificationsIsError,
data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
refetchOnMountOrArgChange: true
});
更新 1
有时会创建来自浏览器的第二个连接,正如我在 DevTools 的“网络”选项卡中看到的那样。
我已经检查过,问题似乎出在 Socket.IO 服务器上:客户端向服务器发送“配置计时器通道”消息,但它什么也没收到。服务器向客户端发送了一组数字,但是...似乎发送给了错误的客户端,尽管我在一个选项卡中只打开了一个应用程序实例!
更新 2
我主要在客户端连接开始时收到这样的消息。它们是什么?
更新 3
服务器有问题。
如果我替换这一行:
socket.to(socket.sessionUserId!).emit("timers", rv);
这一行:
io.of('/').emit("timers", rv);
它将计算结果发送给所有客户端,包括我能看到的那个,我认为它是唯一的客户端。如果不是,则消息不会到达我正在测试的客户端。我不知道为什么,因为我检查了 socket.rooms
并且套接字在 socket.sessionUserId
标识的房间中,我为服务器上的每个套接字都调用了它:
io.on("connection", (normalSocket: Socket) => {
// a user connected
const socket = normalSocket as SocketData;
// join the room named by the user ID that started the socket
socket.join(socket.sessionUserId!);
...
而不是这个:
socket.to(socket.sessionUserId!).emit("timers", rv);
或者这个:
io.of('/').emit("timers", rv);
我只需要用这个:
io.to(socket.sessionUserId!).emit("timers", rv);
相关文档是here。