ECMAScript 中 Atomics 对象的实际用途是什么?

What's the actual use of the Atomics object in ECMAScript?

ECMAScript specification24.4.

部分定义了 Atomics 对象

在所有全局对象中,这对我来说更加晦涩,因为直到我没有阅读它的规范我才知道它的存在,而且 Google 也没有太多引用它(或者也许这个名字太笼统了,一切都被淹没了?)。

根据其官方定义

The Atomics object provides functions that operate indivisibly (atomically) on shared memory array cells as well as functions that let agents wait for and dispatch primitive events

因此它具有对象的形状,具有许多处理低级内存和调节对它的访问的方法。而且它的 public 界面让我猜想。但是,对于最终用户而言,此类对象的实际用途是什么?为什么是public?有一些有用的例子吗?

谢谢

如果您有一些复杂的计算,您可能需要 WebWorkers,以便您的主脚本在并行完成大量工作的同时继续其工作。

Atomics 解决的问题是 WebWorker 如何在彼此之间进行通信(简单、快速和可靠)。您可以阅读有关 ArrayBuffer、SharedArrayBuffer、Atomics 的内容以及如何使用它们来获得好处 here

如果出现以下情况,您不应该为此烦恼:

  • 您正在创建一些简单的东西(例如商店、论坛等)

在以下情况下您可能需要它:

  • 您想创建一些复杂或占用内存的东西(例如 figma or google drive
  • 您希望与 WebAssemblywebgl 合作,并且您希望优化性能
  • 另外,如果你想创建一些复杂的 Node.js 模块,你可能需要它
  • 或者,如果您要通过 Electron like Skype or Discord
  • 创建一个复杂的应用程序

谢谢 and 的回答!

A​​tomics 用于同步共享内存的 WebWorker。它们使对 SharedArrayBuffer 的内存访问以线程安全的方式完成。共享内存使多线程更有用,因为:

  • 无需复制数据即可将其传递给线程
  • 线程可以在不使用事件循环的情况下进行通信
  • 线程可以更快地通信

示例:

var arr = new SharedArrayBuffer(1024);

// send a reference to the memory to any number of webworkers
workers.forEach(worker => worker.postMessage(arr));

// Normally, simultaneous access to the memory from multiple threads 
// (where at least one access is a write)
// is not safe, but the Atomics methods are thread-safe.
// This adds 2 to element 0 of arr.
Atomics.add(arr, 0, 2)

SharedArrayBuffer 以前在主要浏览器上启用,但在 Spectre incident 之后它被禁用,因为共享内存允许实现纳秒精度计时器,这允许利用 spectre。

为了保证安全,浏览器需要运行页面为每个域单独处理。 Chrome 从版本 67 开始执行此操作,并在版本 68 中重新启用共享内存。

除了 Arseniy-II 和 Simon Paris 所说的之外,当您将 JavaScript 引擎嵌入到某些主机应用程序中(以在其中启用脚本)时,Atomics 也很方便。然后可以同时从不同的并发线程直接访问共享内存,无论是从 JS 还是从 C/C++ 或您的主机应用程序编写的任何语言,而不涉及 JavaScript API 访问 C/C++/其他语言方面。

原子操作是“全有或全无”的一组较小的操作。

一起来看看

let i=0;

i++

i++实际上用3个步骤评估

  1. 读取当前i
  2. i 增加 1
  3. return旧值

如果有 2 个线程执行相同的操作会怎样?他们都可以读取相同的值 1 并同时递增它。

但是这个和Javascript,不是单线程的吗?

是的! JavaScript 确实是单线程,但浏览器/节点现在允许并行使用多个 JavaScript 运行 次(Worker Threads,Web Workers)。

Chrome 和 Node(基于 v8)为每个线程创建 Isolate,它们都在自己的 context.

中 运行

罐头 share memory 的唯一方法是通过 ArrayBuffer / SharedArrayBuffer

下一个程序的输出是什么?

运行 节点 > =10(您可能需要 --experimental_worker 标志)

node example.js

const { isMainThread, Worker, workerData } = require('worker_threads');

if (isMainThread) {
  // main thread, create shared memory to share between threads
  const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);

  process.on('exit', () => {
    // print final counter
    const res = new Int32Array(shm);
    console.log(res[0]); // expected 5 * 500,000 = 2,500,000
  });
  Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
  // worker thread, iteratres 500k and doing i++
  const arr = new Int32Array(workerData);
  for (let i = 0; i < 500000; i++) {
    arr[i]++;
  }
}

输出 可能是 2,500,000 但我们不知道,在大多数情况下它不会是 2.5M,实际上,有可能你会得到相同的输出两次是非常低的,作为程序员,我们肯定不喜欢我们不知道它将如何结束的代码。

这是竞争条件的示例,其中 n 个线程相互竞争并且不以任何方式同步。

下面是Atomic操作,可以让我们从头到尾进行算术运算。

让我们稍微改变一下程序,现在 运行:

const { isMainThread, Worker, workerData } = require('worker_threads');


if (isMainThread) {
    const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
    process.on('exit', () => {
        const res = new Int32Array(shm);
        console.log(res[0]); // expected 5 * 500,000 = 2,500,000
    });
    Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
    const arr = new Int32Array(workerData);
    for (let i = 0; i < 500000; i++) {
        Atomics.add(arr, 0, 1);
    }
}

现在输出总是 2,500,000

奖金,使用 Atomics 的 Mutex

有时候,我们希望一个操作只能有1个线程同时访问,我们来看下class

class Mutex {

    /**
     * 
     * @param {Mutex} mutex 
     * @param {Int32Array} resource 
     * @param {number} onceFlagCell 
     * @param {(done)=>void} cb
     */
    static once(mutex, resource, onceFlagCell, cb) {
        if (Atomics.load(resource, onceFlagCell) === 1) {
            return;
        }
        mutex.lock();
        // maybe someone already flagged it
        if (Atomics.load(resource, onceFlagCell) === 1) {
            mutex.unlock();
            return;
        }
        cb(() => {
            Atomics.store(resource, onceFlagCell, 1);
            mutex.unlock();
        });
    }
    /**
     * 
     * @param {Int32Array} resource 
     * @param {number} cell 
     */
    constructor(resource, cell) {
        this.resource = resource;
        this.cell = cell;
        this.lockAcquired = false;
    }

    /**
     * locks the mutex
     */
    lock() {
        if (this.lockAcquired) {
            console.warn('you already acquired the lock you stupid');
            return;
        }
        const { resource, cell } = this;
        while (true) {
            // lock is already acquired, wait
            if (Atomics.load(resource, cell) > 0) {
                while ('ok' !== Atomics.wait(resource, cell, 0));
            }
            const countOfAcquiresBeforeMe = Atomics.add(resource, cell, 1);
            // someone was faster than me, try again later
            if (countOfAcquiresBeforeMe >= 1) {
                Atomics.sub(resource, cell, 1);
                continue;
            }
            this.lockAcquired = true;
            return;
        }
    }

    /**
     * unlocks the mutex
     */
    unlock() {
        if (!this.lockAcquired) {
            console.warn('you didn\'t acquire the lock you stupid');
            return;
        }
        Atomics.sub(this.resource, this.cell, 1);
        Atomics.notify(this.resource, this.cell, 1);
        this.lockAcquired = false;
    }
}

现在,您需要分配 SharedArrayBuffer 并在所有线程之间共享它们,并看到每次只有 1 个线程进入 critical section

运行 节点 > 10

node --experimental_worker example.js

const { isMainThread, Worker, workerData, threadId } = require('worker_threads');


const { promisify } = require('util');
const doSomethingFakeThatTakesTimeAndShouldBeAtomic = promisify(setTimeout);

if (isMainThread) {
    const shm = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
    Array(5).fill(null).map(() => new Worker(__filename, { workerData: shm }));
} else {
    (async () => {
        const arr = new Int32Array(workerData);
        const mutex = new Mutex(arr, 0);
        mutex.lock();
        console.log(`[${threadId}] ${new Date().toISOString()}`);
        await doSomethingFakeThatTakesTimeAndShouldBeAtomic(1000);
        mutex.unlock();
    })();
}

我已经使用 Web Worker 和 SharedArrayBuffer 编写了一个脚本来演示 Atomics 的使用:

<!DOCTYPE html><html><head></head><body><script>
   var arr = new SharedArrayBuffer(256);
   new Int16Array(arr)[0]=0;
   var workers=[];
   for (let i=0; i<1000; i++) workers.push(new Worker('worker.js'));
   workers.forEach(w => w.postMessage(new Int16Array(arr)));
</script></body></html>

然后用一个单独的文件worker.js:

// worker.js
onmessage = function(e) {
    e.data[0]++;                 // last line is 981 only? wth?!
    //Atomics.add(e.data,0,1);   // last line is exactly 1000. right...
    console.log(e.data[0]);
}

如你所见,如果没有 Atomics 保证的互斥锁,加法有时会无法正确执行。