运行 web worker 或 service worker 中的 websocket - javascript

Run websocket in web worker or service worker - javascript

我有 9 个来自不同站点的 websocket 连接,用于更新 DOM 数据。目前我正在连接所有网络套接字并监听所有网络套接字并通过函数调用更新数据。

我面临的问题是有很多 websocket 连接,存在内存和 CPU 使用问题。我如何使用 service workers 和 web workers 来优化这么多的 websocket 连接?

async function appendGatePublicTickersData(e) {
  if (e.event == "update" && e.result[0].contract == "BTC_USD") {
    if ('total_size' in e.result[0]) {
      $(".gate-btc-open-interest").html(commaNumber(e.result[0].total_size))
      if ('last' in e.result[0]) {
        $(".gate-btc-open-value").html(commaNumber(customFixedRounding((e.result[0].total_size / e.result[0].last), 4)))
      }
    }

    if ('volume_24h_usd' in e.result[0]) {
      $(".gate-btc-24-volume").html(commaNumber(e.result[0].volume_24h_usd))
    }

    if ('volume_24h_btc' in e.result[0]) {
      $(".gate-btc-24-turnover").html(commaNumber(e.result[0].volume_24h_btc))
    }

    if ('funding_rate' in e.result[0]) {
      var fundingRateBtcGate = customFixedRounding(e.result[0].funding_rate * 100, 4)
      $(".public-gate-btc-funding").html(fundingRateBtcGate)
    }

    if ('funding_rate_indicative' in e.result[0]) {
      var predictedRateBtcGate = customFixedRounding(e.result[0].funding_rate_indicative * 100, 4)
      $(".public-gate-btc-predicted").html(predictedRateBtcGate)
    }
  }
}

var pubGateWs = new WebSocket("wss://fx-ws.gateio.ws/v4/ws/btc");

pubGateWs.addEventListener("open", function() {
  pubGateWs.send(JSON.stringify({
    "time": 123456,
    "channel": "futures.tickers",
    "event": "subscribe",
    "payload": ["BTC_USD", "ETH_USD"]
  }))
});

pubGateWs.addEventListener("message", function(e) {
  e = JSON.parse(e.data)
  appendGatePublicTickersData(e)
});

pubGateWs.addEventListener("close", function() {});

由于您使用的是 Web 套接字,因此最好使用 SharedWorker 为您的 Web 套接字创建一个新线程。普通 WebWorkerSharedWorker 之间的区别在于,网络工作者将在加载页面时在每个选项卡或浏览器中创建一个新会话,而共享工作者将在每个选项卡中使用相同的会话。因此,您的所有选项卡或 windows 都将使用相同的工作程序和相同的 Web Socket 连接。

如果数据更新非常频繁(每秒超过 60 次)并且每次发生时 DOM 都必须更新,那么使用 requestAnimationFrame 方法来限制更新的数量DOM 正在更新。它将等待下一个重绘周期,然后再用新内容更新 DOM,大约每秒 60 次,或 60FPS。

这个的实现就像下面的例子:

主线程。

// Create shared worker.
const webSocketWorker = new SharedWorker('web-sockets-worker.js');

/**
 * Sends a message to the worker and passes that to the Web Socket.
 * @param {any} message 
 */
const sendMessageToSocket = message => {
  webSocketWorker.port.postMessage({ 
    action: 'send', 
    value: message,
  });
};

// Event to listen for incoming data from the worker and update the DOM.
webSocketWorker.port.addEventListener('message', ({ data }) => {
  requestAnimationFrame(() => {
    appendGatePublicTickersData(data);
  });
});
  
// Initialize the port connection.
webSocketWorker.port.start();

// Remove the current worker port from the connected ports list.
// This way your connectedPorts list stays true to the actual connected ports, 
// as they array won't get automatically updated when a port is disconnected.
window.addEventListener('beforeunload', () => {
  webSocketWorker.port.postMessage({ 
    action: 'unload', 
    value: null,
  });

  webSocketWorker.port.close();
});

共享工作者。

/**
 * Array to store all the connected ports in.
 */
const connectedPorts = [];

// Create socket instance.
const socket = new WebSocket("wss://fx-ws.gateio.ws/v4/ws/btc");

// Send initial package on open.
socket.addEventListener('open', () => {
  const package = JSON.stringify({
    "time": 123456,
    "channel": "futures.tickers",
    "event": "subscribe",
    "payload": ["BTC_USD", "ETH_USD"]
  });

  socket.send(package);
});

// Send data from socket to all open tabs.
socket.addEventListener('message', ({ data }) => {
  const package = JSON.parse(data);
  connectedPorts.forEach(port => port.postMessage(package));
});

/**
 * When a new thread is connected to the shared worker,
 * start listening for messages from the new thread.
 */
self.addEventListener('connect', ({ ports }) => {
  const port = ports[0];

  // Add this new port to the list of connected ports.
  connectedPorts.push(port);

  /**
   * Receive data from main thread and determine which
   * actions it should take based on the received data.
   */
  port.addEventListener('message', ({ data }) => {
    const { action, value } = data;

    // Send message to socket.
    if (action === 'send') {
      socket.send(JSON.stringify(value));

    // Remove port from connected ports list.
    } else if (action === 'unload') {
      const index = connectedPorts.indexOf(port);
      connectedPorts.splice(index, 1);
    }
  });

  // Start the port broadcasting.
  port.start();
});

旁注:您的 appendGatePublicTickersData 没有使用 await 关键字,因此它不必是 async 函数。