Deno:服务器发送的事件

Deno: Server Sent Events

Server Sent Events 是一个很有价值的工具,可以打开与 Web 服务器的持久连接,其中服务器能够在可用时将新数据推送到客户端。

在 Node.js 中使用此技术非常简单,可以通过以下代码示例实现:

#!/usr/bin/env node
'use strict';

const http = (options, listener) => require('http').createServer(listener).listen(options.port);

http({ port: 8080 }, (req, res) => {
  switch (req.url) {
    case '/server-sent-events': {
      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Connection': 'keep-alive',
        'Cache-Control': 'no-cache',
      });

      const sendDate = () => res.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      req.on('close', () => clearInterval(interval));
    } break;

    default: {
      res.writeHead(200, {
        'Content-Type': 'text/html; charset=utf-8',
      });
      res.end(`
        <!DOCTYPE html>
        <html>
          <head>
            <title>Server Send Events</title>
            <meta charset="utf-8">
            <script>
              const sse = new EventSource('/server-sent-events');
              sse.onerror = () => document.body.innerHTML = 'Connection Error';
              sse.onmessage = ({ data }) => document.body.innerHTML = data;
            </script>
          </head>
          <body></body>
        </html>
      `);
    }
  }
});

不幸的是,我无法使用 Deno 实现相同的目标,因为请求对象上没有简单的 write 方法,但我想它必须以某种方式使用 req.w 来实现缓冲。你能帮我完成下面的示例代码,这样服务器发送的事件也可以与 Deno 一起使用吗?

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // missing steps:
      // * setup the server sent event headers
      // * create the interval and send the date periodically
      // * clear the interval when the connection gets closed
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

非常感谢您的支持!

[更新 2021-11-04]:

我在跨不同来源 (https://deno.land/std@0.76.0/http/server.ts, https://github.com/denoland/deno/issues/4817) 进行一些研究时取得了一些进展,并且离解决方案又近了一步。使用下面的更新示例,至少服务器发送事件的设置和使用现在可以正常工作。剩下的问题(除了代码的清理和重构之外)仍然是传入请求关闭时的安全检测(请参阅下面源代码中的注释):

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // set up a quick´n´dirty write method without error checking
      req.write = (data) => {
        req.w.write(new TextEncoder().encode(data));
        req.w.flush();
      };

      // setup the server sent event headers
      let headers = '';
      headers += 'HTTP/1.1 200 OK\r\n';
      headers += 'Connection: keep-alive\r\n';
      headers += 'Cache-Control: no-cache\r\n';
      headers += 'Content-Type: text/event-stream\r\n';
      headers += '\r\n';
      req.write(headers);

      // create the interval and send the date periodically
      const sendDate = () => req.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      // final missing step:
      // * clear the interval when the connection gets closed

      // currently dropping the connection from the client will
      // result in the error: Uncaught (in promise) BrokenPipe:
      // Broken pipe (os error 32)
      // this error also does not seem to be catchable in the 
      // req.write method above, so there needs to be another safe
      // way to prevent this error from occurring.
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

[2021-04-16更新]

所有问题都已解决并发布在下面我接受的答案中。

Deno 的 http 库不支持 SSE,但是你可以使用 Oak Framework,或者自己实现。

import { Application, Router } from "https://deno.land/x/oak/mod.ts";

const app = new Application();
const router = new Router();

router.get('/', ctx => {
  ctx.response.body = `
    <!DOCTYPE html>
    <html>
      <head>
        <title>Server Send Events</title>
        <meta charset="utf-8">
        <script>
          const sse = new EventSource('/server-sent-events');
          sse.onerror = () => document.body.innerHTML = 'Connection Error';
          sse.onmessage = ({ data }) => document.body.innerHTML = data;
        </script>
      </head>
      <body></body>
    </html>
  `;
})

router.get("/server-sent-events", (ctx) => {
  const target = ctx.sendEvents();
  const sendDate = () => target.dispatchMessage(`${new Date()}`);
  sendDate();
  const interval = setInterval(sendDate, 1000);
});

app.use(router.routes());
await app.listen({ port: 8080 });

最后我找到了我的问题的答案,下面是带有大量评论的完整答案,这样您就可以在 Deno 中获得服务器发送事件的工作版本。下面的解决方案也解决了os error 32,这是由于没有捕获连接写入器闪存方法引起的:

#!/usr/bin/env deno run --allow-net

// imports
import { ServerRequest, listenAndServe as http } from 'https://deno.land/std/http/server.ts';

// commodity
const encoder = new TextEncoder();
const print = console.log;


// start the web-server
// this one allows the endpoint `/server-sent-events`, which hosts a clock that
// will be refreshed every second (the efficiency of the clock solution could of
// course be optimised, as every client gets its own clock interval, but this
// this does not matter as this example wants to show how to setup and clean a
// task for every connecting client)
// all other requests will be answered with a simple html page that subscribes
// to the sse-based clock
http({ port: 8080 }, async (req) => {
  // ip address of the client (formatted as `ip:port`, so we cut the `:port` part
  // of it)
  const ip = req.headers.get('host').split(':').slice(0, -1).join(':');

  // determine the endpoint to access
  switch (req.url) {
    // host the server sent event based clock
    case '/server-sent-events': {
      // logging
      print(`+ Client ${ip} connected`);

      // prepare the disconnect promise. we will use this one later on to await
      // the clients disconnect, so we can properly clean up. so the promise will
      // be resolved manually by us when we detect a disconnect from the client
      // on an attempt to send new data to him (unfortunately there seems to be
      // no other way to detect when the client actually closed the connection)
      let resolver;
      const disconnect = new Promise((resolve) => resolver = resolve);

      // write helper
      req.write = async (data) => {
        // send the current data to the client
        req.w.write(encoder.encode(data));

        // to actually send the data we need to flush the writer first. we need
        // to try/catch this part, as not handling errors on flush will lead to
        // the `Broken pipe (os error 32)` error
        try {
          await req.w.flush();
        } catch(err) {
          // throw any errors but the broken pipe, which gets thrown when the
          // client has already disconnected and we try to send him new data
          // later on
          if (err.name !== 'BrokenPipe') {
            throw err;
          }

          // close the connection from our side as well
          req.conn.close();

          // resolve our `disconnect` promise, so we can clean up
          resolver();
        }
      };

      // date writer (interval method which pushes the current date to the client)
      const sendDate = async () => await req.write(`data: ${new Date()}\n\n`);

      // prepare and send the headers
      let headers = '';
      headers += `HTTP/1.1 200 OK\r\n`;
      headers += `Connection: keep-alive\r\n`;
      headers += `Cache-Control: no-cache\r\n`;
      headers += `Content-Type: text/event-stream\r\n`;
      headers += `\r\n`;
      await req.write(headers);

      // send the date now for the first time and then every second
      sendDate();
      const interval = setInterval(sendDate, 1000);

      // await until the clients disconnects to clean up. so we will be "stuck"
      // here until a disconnect gets detected as we use a promise based approach
      // to detect the disconnect
      await disconnect;
      clearInterval(interval);

      // logging
      print(`- Client ${ip} disconnected`);
    } break;

    // all other requests host a simple html page which subscribes to the clock
    default: {
      print(`* Serve website to ${ip}`);
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Sent Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});