多核机器上的 Deno

Deno on multi-core machines

在 Node.js 中有集群模块可以利用机器上所有可用的内核,这非常棒,尤其是与节点模块 pm2 一起使用时。但是我对 Deno 的一些功能非常感兴趣,但我想知道如何在多核机器上最好地 运行 它。

我知道有些 worker 可以很好地完成特定任务,但对于普通的 Web 请求,多核机器的性能似乎有些浪费?在 Deno 中获得最大可用性和利用率的最佳策略是什么?

我有点担心,如果你只有一个进程在进行,并且有一些 CPU 密集型任务,无论出于何种原因,它都会“阻止”所有其他请求。在 node.js 集群模块会解决这个问题,因为另一个进程会处理请求,但我不确定如何在 Deno 中处理这个问题?

我认为你可以 运行 在不同端口上的 Deno 中的多个实例,然后在它前面有某种负载均衡器,但相比之下,这似乎是一个相当复杂的设置。我还了解到您可以使用 Deno Deploy 之类的某种服务,但我已经有了想要 运行 它的硬件。

我有哪些选择? 在此先感谢您的明智建议和更好的智慧。

在 Deno 中,就像在网络浏览器中一样,您应该能够 use Web Workers to utilize 100% of a multi-core CPU

在集群中,您需要一个“管理器”节点(它本身也可以是一个工作节点,如 needed/appropriate)。以类似的方式,Web Worker API 可用于根据需要创建任意数量的专用 worker。这意味着主线程永远不应该阻塞,因为它可以将所有可能阻塞的任务委托给它的工作线程。不会阻塞的任务(例如简单的数据库或其他 I/O 绑定调用)可以像平常一样直接在主线程上完成。

Deno 还支持 navigator.hardwareConcurrency,因此您可以查询可用硬件并相应地确定所需工作人员的数量。不过,您可能不需要定义任何限制。从与先前生成的专用 worker 相同的来源生成新的专用 worker 可能足够快,可以按需执行此操作。即便如此,重用专门的工作人员而不是为每个请求生成一个新工作人员可能是有价值的。

使用 Transferable Objects large data sets can be made available to/from workers without copying the data. This along with messaging 可以非常直接地委派任务,同时避免复制大型数据集造成的性能瓶颈。

根据您的用例,您还可以使用像 Comlink 这样的库,“它消除了思考 postMessage 的心理障碍,并隐藏了您正在与工人一起工作的事实。”

例如

main.ts

import { serve } from "https://deno.land/std@0.133.0/http/server.ts";

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

serve(async function handler(request) {
  const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
    type: "module",
  });

  const handler = ComlinkRequestHandler.wrap(worker);

  return await handler(request);
});

worker.ts

/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

ComlinkRequestHandler.expose(async (request) => {
  const body = await request.text();
  return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`);
});

ComlinkRequestHandler.ts

import * as Comlink from "https://cdn.skypack.dev/comlink@4.3.1?dts";

interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
  url: string;
  headers: Record<string, string>;
  hasBody: boolean;
}

interface ResponseMessage extends ResponseInit {
  headers: Record<string, string>;
  hasBody: boolean;
}

export default class ComlinkRequestHandler {
  #handler: (request: Request) => Promise<Response>;
  #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;

  static expose(handler: (request: Request) => Promise<Response>) {
    Comlink.expose(new ComlinkRequestHandler(handler));
  }

  static wrap(worker: Worker) {
    const { handleRequest, nextResponseBodyChunk } =
      Comlink.wrap<ComlinkRequestHandler>(worker);

    return async (request: Request): Promise<Response> => {
      const requestBodyReader = request.body?.getReader();

      const requestMessage: RequestMessage = {
        url: request.url,
        hasBody: requestBodyReader !== undefined,
        cache: request.cache,
        credentials: request.credentials,
        headers: Object.fromEntries(request.headers.entries()),
        integrity: request.integrity,
        keepalive: request.keepalive,
        method: request.method,
        mode: request.mode,
        redirect: request.redirect,
        referrer: request.referrer,
        referrerPolicy: request.referrerPolicy,
      };

      const nextRequestBodyChunk = Comlink.proxy(async () => {
        if (requestBodyReader === undefined) return undefined;
        const { value } = await requestBodyReader.read();
        return value;
      });

      const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
        requestMessage,
        nextRequestBodyChunk
      );

      const responseBodyInit: BodyInit | null = responseHasBody
        ? new ReadableStream({
            start(controller) {
              async function push() {
                const value = await nextResponseBodyChunk();
                if (value === undefined) {
                  controller.close();
                  return;
                }
                controller.enqueue(value);
                push();
              }

              push();
            },
          })
        : null;

      return new Response(responseBodyInit, responseInit);
    };
  }

  constructor(handler: (request: Request) => Promise<Response>) {
    this.#handler = handler;
  }

  async handleRequest(
    { url, hasBody, ...init }: RequestMessage,
    nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
  ): Promise<ResponseMessage> {
    const request = new Request(
      url,
      hasBody
        ? {
            ...init,
            body: new ReadableStream({
              start(controller) {
                async function push() {
                  const value = await nextRequestBodyChunk();
                  if (value === undefined) {
                    controller.close();
                    return;
                  }
                  controller.enqueue(value);
                  push();
                }

                push();
              },
            }),
          }
        : init
    );
    const response = await this.#handler(request);
    this.#responseBodyReader = response.body?.getReader();
    return {
      hasBody: this.#responseBodyReader !== undefined,
      headers: Object.fromEntries(response.headers.entries()),
      status: response.status,
      statusText: response.statusText,
    };
  }

  async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
    if (this.#responseBodyReader === undefined) return undefined;
    const { value } = await this.#responseBodyReader.read();
    return value;
  }
}

用法示例:

% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar

Received:

{"answer":42}

可能有更好的方法来做到这一点(例如通过 Comlink.transferHandlers 并为 RequestResponse、and/or ReadableStream 注册传输处理程序)但是这个想法是一样的,甚至可以处理大的请求或响应负载,因为正文是通过消息流传输的。