如何根据多个组件中的不同事件类型(通道)建立单个 Socket.IO 连接和多个流式更新 RTKQ?

How to have a single Socket.IO connection and multiple streaming-updates RTKQ's based on different event types (channels) in multiple components?

我认为在浏览器中使用单个 Socket.IO 连接是正常的,因为我有一个 Socket.IO 服务器来处理消息中包含的两个信息通道。

下面的代码运行不正常,逻辑上不正确。对于复杂的异步编程,我是初学者。

我没有收到错误,但代码的行为不够好。

我试过这段代码,但它并不总是在应该的时候从服务器接收消息,我确信它有一些错误。以下是我的代码的一部分,稍微修改了一下,以便我可以 post 这里。

类型

export interface INotificationsChannelInitParams {
  userIds: string[];
}

export interface ITimersChannelInitParams {
  tids: string[];
}

export interface IGetNotificationsQueryParams {
  authToken: string | null;
  userId: string | null;
  limit?: number;
}

export interface IGetTimersQueryParams {
  tids: string[];
}

店铺

let lastConnectHandler: ReturnType<typeof connectHandler> | null = null;

export const getFollowedThingIds = () => {
  return uniq(followedThingIds); // `uniq` is from the `lodash` library
};

const connectHandler = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  const fn = () => {
    if (initAs === "timers") {
      const argument = arg as ITimersChannelInitParams;
      followedThingIds = argument.tids.concat(followedThingIds);
      argument.tids = uniq(followedThingIds);
      myIo!.emit("configure timers channel", argument);
      timersAreSetUp = true;
    }

    if (initAs === "notifications") {
      const argument = arg as INotificationsChannelInitParams;
      followedUserIds = argument.userIds.concat(followedUserIds);
      argument.userIds = followedUserIds;
      myIo!.emit("configure notifications channel", argument);
      notificationsAreSetUp = true;
    }
  };

  lastConnectHandler = fn;

  return fn;
};

let myIo: Socket | null = null;
let timersAreSetUp = false;
let notificationsAreSetUp = false;
let followedUserIds: string[] = [];
let followedThingIds: string[] = [];
export const getWebSocketConnection = (
  initAs: "timers" | "notifications",
  arg: INotificationsChannelInitParams | ITimersChannelInitParams
) => {
  if (myIo === null) {
    // TODO: move this replacement set to its own function next to toAbsoluteUrl
    const wsUrl = toAbsoluteUrl("/").replace(/\:(\d)+(\/)+$/g, ":8080/");

    myIo = io(wsUrl, { autoConnect: false, port: "8080" });

    myIo.onAny((event, ...args) => {
      // console.log("S.IO", event, args);
    });

    // TODO: use this somewhere so it is not dead code
    let invalidAuthToken = false;

    myIo.on("connect_error", err => {
      if (err.message === "invalid auth token") {
        invalidAuthToken = true;
      }
    });
  }

  console.log('5.16.', store.getState().auth.authToken);
  myIo.auth = { authToken: store.getState().auth.authToken };

  myIo.on("connect", connectHandler(initAs, arg));

  myIo.connect();

  return myIo;
};

export const resetFollowedUserIds = (userIds: string[]) => {
  // followedUserIds = userIds;
  // const argument = { userIds: followedUserIds } as INotificationsChannelInitParams;
  // myIo!.emit("configure notifications channel", argument);
  // notificationsAreSetUp = true;
};

// TODO: use this function so that the followed things (for timers) and
// users (for notifications) don't add up
const closeWebSocketConnection = (uninitAs: "timers" | "notifications") => {
  if (myIo === null) {
    return;
  }

  if (uninitAs === "timers") {
    const argument = { tids: [] } as ITimersChannelInitParams;
    myIo.emit("configure timers channel", argument);
    timersAreSetUp = false;
  }

  if (uninitAs === "notifications") {
    const argument = { userIds: [] } as INotificationsChannelInitParams;
    myIo.emit("configure notifications channel", argument);
    notificationsAreSetUp = false;
  }

  // if (!timersAreSetUp && !notificationsAreSetUp) {
  //   myIo.off("connect_error");
  //   myIo.disconnect();
  //   myIo = null;
  // }
};

RTKQ 的

getTimers: build.query<
    { [index: string]: number },
    IGetTimersQueryParams
  >({
    query: (params: IGetTimersQueryParams) => ({
      url: `timers?things=` + params.tids.join(","),
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      const myIo = getWebSocketConnection("timers", clone(arg));

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: number[]) => {
        updateCachedData(draft => {
          getFollowedThingIds().forEach((x: string, i: number) => {
            draft[x] = eventData[i];
          });

          // while (draft.length > 0) {
          //   draft.pop();
          // }
          // eventData.forEach((x: number, idx: number) => {
          //   // TODO: cleanup dead timers (<= 0, maybe use a call like
          //   // ws.send(JSON.stringify(arg)))
          //   draft.push(x);
          // });
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("timers", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("timers");

      myIo.off("timers", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  }),
  getNotifications: build.query<
    IDbThing[],
    IGetNotificationsQueryParams
  >({
    query: (params: IGetNotificationsQueryParams) => ({
      url: `notifications?authToken=${params.authToken || ""}&limit=${
        typeof params.limit === "number" ? params.limit : 5
      }&userId=${params.userId || ""}`,
      method: "GET"
    }),
    async onCacheEntryAdded(
      arg,
      { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
    ) {
      // TODO: notifications for not-logged-in users?
      if (arg.userId === null) {
        return;
      }

      // TODO: here keep the previous user ID set up in the notifications
      // channel, besides the new one, and make sure each notification
      // returned by the WS server tells the user to which it should be
      // served
      const myIo = getWebSocketConnection(
        "notifications",
        clone({
          userIds: [arg.userId]
        })
      );

      // when data is received from the socket connection to the server,
      // if it is a valid JSON object, update our query result with the
      // received message
      const listener = (eventData: IDbNotification) => {
        // if receiving a user notification

        updateCachedData(draft => {
          draft.unshift(eventData);
          if (draft.length > 5) {
            draft.pop();
          }
        });
      };

      try {
        // wait for the initial query to resolve before proceeding
        await cacheDataLoaded;

        myIo.on("notifications", listener);
      } catch {
        // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
        // in which case `cacheDataLoaded` will throw
      }

      // cacheEntryRemoved will resolve when the cache subscription is no longer active
      await cacheEntryRemoved;

      // perform cleanup steps once the `cacheEntryRemoved` promise resolves
      // closeWebSocketConnection("notifications");
      myIo.off("notifications", listener);
      myIo.off("connect", lastConnectHandler as any);
    }
  })
})

RTKQ 的用法示例

const getNotificationsQueryParams = useMemo(
  () => ({
    authToken: user?.authToken!,
    userId: params.userId,
    limit: 0,
  }),
  [user, params.userId]
);

const {
  isLoading: notificationsIsLoading,
  isError: notificationsIsError,
  data: shownNotifications
} = useGetNotificationsQuery(getNotificationsQueryParams, {
  refetchOnMountOrArgChange: true
});

更新 1

有时会创建来自浏览器的第二个连接,正如我在 DevTools 的“网络”选项卡中看到的那样。

我已经检查过,问题似乎出在 Socket.IO 服务器上:客户端向服务器发送“配置计时器通道”消息,但它什么也没收到。服务器向客户端发送了一组数字,但是...似乎发送给了错误的客户端,尽管我在一个选项卡中只打开了一个应用程序实例!

更新 2

我主要在客户端连接开始时收到这样的消息。它们是什么?

更新 3

服务器有问题。

如果我替换这一行:

socket.to(socket.sessionUserId!).emit("timers", rv);

这一行:

io.of('/').emit("timers", rv);

它将计算结果发送给所有客户端,包括我能看到的那个,我认为它是唯一的客户端。如果不是,则消息不会到达我正在测试的客户端。我不知道为什么,因为我检查了 socket.rooms 并且套接字在 socket.sessionUserId 标识的房间中,我为服务器上的每个套接字都调用了它:

io.on("connection", (normalSocket: Socket) => {
  // a user connected

  const socket = normalSocket as SocketData;

  // join the room named by the user ID that started the socket
  socket.join(socket.sessionUserId!);

...

而不是这个:

socket.to(socket.sessionUserId!).emit("timers", rv);

或者这个:

io.of('/').emit("timers", rv);

我只需要用这个:

io.to(socket.sessionUserId!).emit("timers", rv);

相关文档是here