将 JSON 从 REST API 流式传输到 Express 应用程序的最佳方法是什么?

What is the best approach to stream JSON from a REST API to an Express app?

我有一个 moleculer-based 微服务,它有一个输出大型 JSON 对象(大约数万个对象)的端点

这是一个结构化的 JSON 对象,我事先知道它的外观。

[ // ... tens of thousands of these
  {
    "fileSize": 1155624,
    "name": "Gyo v1-001.jpg",
    "path": "./userdata/expanded/Gyo v01 (2003)"
  },
  {
    "fileSize": 308145,
    "name": "Gyo v1-002.jpg",
    "path": "./userdata/expanded/Gyo v01 (2003) (Digital)"
  }
  // ... tens of thousands of these
]

我开始研究 JSON 流式传输,并在那里取得了一些进展,因为我知道如何使用 NodeJS ReadableStream 客户端。我知道我可以使用 oboe 来解析 JSON 流。

为此,这是我基于 Express 的应用程序中的代码。


router.route("/getComicCovers").post(async (req: Request, res: Response) => {
  typeof req.body.extractionOptions === "object"
    ? req.body.extractionOptions
    : {};
  oboe({
    url: "http://localhost:3000/api/import/getComicCovers",
    method: "POST",
    body: {
      extractionOptions: req.body.extractionOptions,
      walkedFolders: req.body.walkedFolders,
    },
  }).on("node", ".*", (data) => {
    console.log(data);
    res.write(JSON.stringify(data));
  });
});

这是moleculer

中的端点
getComicCovers: {
    rest: "POST /getComicCovers",
    params: {
        extractionOptions: "object",
        walkedFolders: "array",
    },
    async handler(
        ctx: Context < {
            extractionOptions: IExtractionOptions;
            walkedFolders: IFolderData[];
        } >
    ) {
        
        const comicBooksForImport = await getCovers(
            ctx.params.extractionOptions,
            ctx.params.walkedFolders
        );

// comicBooksForImport is the aforementioned array of objects.
// How do I stream it from here to the Express app object-by-object?

        
    },
},

我的问题是:如何将这个巨大的 JSON 从 REST 端点流式传输到 Express 应用程序,以便我可以在客户端解析它?

更新

我根据@JuanCaicedo 的建议进行了 socket.io 实施。我在服务器端和客户端都设置了它。

但是,这段代码我确实遇到了问题

map(
    walkedFolders,
    async (folder, idx) => {
        let foo = await extractArchive(
            extractionOptions,
            folder
        );

        let fo =
            new JsonStreamStringify({
                foo,
            });

        fo.pipe(res);
        if (
            +idx ===
            walkedFolders.length - 1
        ) {
            res.end();
        }
    }
);

我收到 Error [ERR_STREAM_WRITE_AFTER_END]: write after end 错误。我知道发生这种情况是因为响应在下一次迭代尝试将 foo (这是一个流)的更新值通过管道传输到响应之前终止。

我该如何解决这个问题?

您是在寻求一般方法建议,还是寻求对您拥有的特定解决方案的支持?

如果是第一个,那么我认为在服务器和客户端之间进行通信的最佳选择是通过 websockets,也许使用 Socket.io 之类的东西。长期连接将为您提供良好的服务,因为传输所有数据需要很长时间。

然后您可以随时将数据从服务器发送到客户端。届时,您可以将服务器上的数据作为 node.js 流读取并一次发送一个数据。

使用 Oboe 并在每个节点上写入响应的问题是它需要很长的 运行 响应,并且很可能在您发送完所有数据之前连接就中断了.