为什么在两个 CPU 寄存器之间移动数据如此缓慢以致于花费了总时间的 30%?

Why is moving data between two CPU registers so slow that it costs 30% of total time?

在尝试分析和优化缓存算法时,我遇到了一些我不理解的问题。

这是 perf 报告的热点(在 Ubuntu 18.04 LTS 和 g++ 7.5 中):

仅仅 rax 和 rdx 寄存器之间的一个“mov”操作如何导致约 30% 的总 运行 程序时间?它不是一个寄存器繁重的程序(一种 LRU 缓存近似算法,每秒最多进行约 5000 万次查找,这大约是 400MB/s 的吞吐量(对于更大的键值对,肯定不会比 RAM 快),这不应该是与寄存器带宽完全相关)

测试系统的 CPU 是 FX8150,具有 1 通道内存和 9GB/s 的带宽,这比这个单线程缓存仅“int”键 + “int”值对所能达到的要高得多。所以我想将 RAM 排除在这个问题之外应该是安全的。此外,mov 指令看起来像是 std::unordered_map 查找操作的一部分。我知道这个 CPU 很旧但不是真的很古老所以编译器可能由于某些“支持旧 CPU” 问题而没有使用正确的指令吗?

重现热点的源代码:

#ifndef LRUCLOCKCACHE_H_
#define LRUCLOCKCACHE_H_

#include<vector>
#include<algorithm>
#include<unordered_map>
#include<functional>
#include<mutex>
#include<unordered_map>


/* LRU-CLOCK-second-chance implementation
 *
 * LruKey: type of key (std::string, int, char, size_t, objects)
 * LruValue: type of value that is bound to key (same as above)
 * ClockHandInteger: just an optional optimization to reduce memory consumption when cache size is equal to or less than 255,65535,4B-1,...
 */
template<   typename LruKey, typename LruValue,typename ClockHandInteger=size_t>
class LruClockCache
{
public:
    // allocates circular buffers for numElements number of cache slots
    // readMiss:    cache-miss for read operations. User needs to give this function
    //              to let the cache automatically get data from backing-store
    //              example: [&](MyClass key){ return redis.get(key); }
    //              takes a LruKey as key, returns LruValue as value
    // writeMiss:   cache-miss for write operations. User needs to give this function
    //              to let the cache automatically set data to backing-store
    //              example: [&](MyClass key, MyAnotherClass value){ redis.set(key,value); }
    //              takes a LruKey as key and LruValue as value

    LruClockCache(ClockHandInteger numElements,
                const std::function<LruValue(LruKey)> & readMiss,
                const std::function<void(LruKey,LruValue)> & writeMiss):size(numElements),loadData(readMiss),saveData(writeMiss)
    {
        ctr = 0;
        // 50% phase difference between eviction and second-chance hands of the "second-chance" CLOCK algorithm
        ctrEvict = numElements/2;

        //loadData=readMiss;
        //saveData=writeMiss;

        // initialize circular buffers
        for(ClockHandInteger i=0;i<numElements;i++)
        {
            valueBuffer.push_back(LruValue());
            chanceToSurviveBuffer.push_back(0);
            isEditedBuffer.push_back(0);
            keyBuffer.push_back(LruKey());
        }
    }



    // get element from cache
    // if cache doesn't find it in circular buffers,
    // then cache gets data from backing-store
    // then returns the result to user
    // then cache is available from RAM on next get/set access with same key
    inline
    const LruValue get(const LruKey & key)  noexcept
    {
        return accessClock2Hand(key,nullptr);
    }

    // only syntactic difference
    inline
    const std::vector<LruValue> getMultiple(const std::vector<LruKey> & key)  noexcept
    {
        const int n = key.size();
        std::vector<LruValue> result(n);

        for(int i=0;i<n;i++)
        {
            result[i]=accessClock2Hand(key[i],nullptr);
        }
        return result;
    }


    // thread-safe but slower version of get()
    inline
    const LruValue getThreadSafe(const LruKey & key)  noexcept
    {
        std::lock_guard<std::mutex> lg(mut);
        return accessClock2Hand(key,nullptr);
    }

    // set element to cache
    // if cache doesn't find it in circular buffers,
    // then cache sets data on just cache
    // writing to backing-store only happens when
    //                  another access evicts the cache slot containing this key/value
    //                  or when cache is flushed by flush() method
    // then returns the given value back
    // then cache is available from RAM on next get/set access with same key
    inline
    void set(const LruKey & key, const LruValue & val) noexcept
    {
        accessClock2Hand(key,&val,1);
    }

    // thread-safe but slower version of set()
    inline
    void setThreadSafe(const LruKey & key, const LruValue & val)  noexcept
    {
        std::lock_guard<std::mutex> lg(mut);
        accessClock2Hand(key,&val,1);
    }

    // use this before closing the backing-store to store the latest bits of data
    void flush()
    {
        for (auto mp = mapping.cbegin(); mp != mapping.cend() /* not hoisted */; /* no increment */)
        {
          if (isEditedBuffer[mp->second] == 1)
          {
                isEditedBuffer[mp->second]=0;
                auto oldKey = keyBuffer[mp->second];
                auto oldValue = valueBuffer[mp->second];
                saveData(oldKey,oldValue);
                mapping.erase(mp++);    // or "it = m.erase(it)" since C++11
          }
          else
          {
                ++mp;
          }
        }
    }

    // CLOCK algorithm with 2 hand counters (1 for second chance for a cache slot to survive, 1 for eviction of cache slot)
    // opType=0: get
    // opType=1: set
    LruValue const accessClock2Hand(const LruKey & key,const LruValue * value, const bool opType = 0)
    {

        // check if it is a cache-hit (in-cache)
        typename std::unordered_map<LruKey,ClockHandInteger>::iterator it = mapping.find(key);
        if(it!=mapping.end())
        {

            chanceToSurviveBuffer[it->second]=1;
            if(opType == 1)
            {
                isEditedBuffer[it->second]=1;
                valueBuffer[it->second]=*value;
            }
            return valueBuffer[it->second];
        }
        else // could not found key in cache, so searching in circular-buffer starts
        {
            long long ctrFound = -1;
            LruValue oldValue;
            LruKey oldKey;
            while(ctrFound==-1)
            {
                // eviction hand lowers the "chance" status down if its 1 but slot is saved from eviction
                if(chanceToSurviveBuffer[ctr]>0)
                {
                    chanceToSurviveBuffer[ctr]=0;
                }

                // circular buffer has no bounds
                ctr++;
                if(ctr>=size)
                {
                    ctr=0;
                }

                // unlucky slot is selected for eviction
                if(chanceToSurviveBuffer[ctrEvict]==0)
                {
                    ctrFound=ctrEvict;
                    oldValue = valueBuffer[ctrFound];
                    oldKey = keyBuffer[ctrFound];
                }

                // circular buffer has no bounds
                ctrEvict++;
                if(ctrEvict>=size)
                {
                    ctrEvict=0;
                }
            }

            // eviction algorithm start
            if(isEditedBuffer[ctrFound] == 1)
            {
                // if it is "get"
                if(opType==0)
                {
                    isEditedBuffer[ctrFound]=0;
                }

                saveData(oldKey,oldValue);

                // "get"
                if(opType==0)
                {
                    const LruValue && loadedData = loadData(key);
                    mapping.erase(keyBuffer[ctrFound]);
                    valueBuffer[ctrFound]=loadedData;
                    chanceToSurviveBuffer[ctrFound]=0;

                    mapping.emplace(key,ctrFound);
                    keyBuffer[ctrFound]=key;

                    return loadedData;
                }
                else /* "set" */
                {
                    mapping.erase(keyBuffer[ctrFound]);


                    valueBuffer[ctrFound]=*value;
                    chanceToSurviveBuffer[ctrFound]=0;

                    mapping.emplace(key,ctrFound);
                    keyBuffer[ctrFound]=key;
                    return *value;
                }
            }
            else // not edited
            {
                // "set"
                if(opType == 1)
                {
                    isEditedBuffer[ctrFound]=1;
                }

                // "get"
                if(opType == 0)
                {
                    const LruValue && loadedData = loadData(key);
                    mapping.erase(keyBuffer[ctrFound]);
                    valueBuffer[ctrFound]=loadedData;
                    chanceToSurviveBuffer[ctrFound]=0;

                    mapping.emplace(key,ctrFound);
                    keyBuffer[ctrFound]=key;

                    return loadedData;
                }
                else // "set"
                {
                    mapping.erase(keyBuffer[ctrFound]);


                    valueBuffer[ctrFound]=*value;
                    chanceToSurviveBuffer[ctrFound]=0;

                    mapping.emplace(key,ctrFound);
                    keyBuffer[ctrFound]=key;
                    return *value;
                }
            }

        }
    }


private:
    ClockHandInteger size;
    std::mutex mut;
    std::unordered_map<LruKey,ClockHandInteger> mapping;
    std::vector<LruValue> valueBuffer;
    std::vector<unsigned char> chanceToSurviveBuffer;
    std::vector<unsigned char> isEditedBuffer;
    std::vector<LruKey> keyBuffer;
    const std::function<LruValue(LruKey)>  loadData;
    const std::function<void(LruKey,LruValue)>  saveData;
    ClockHandInteger ctr;
    ClockHandInteger ctrEvict;
};



#endif /* LRUCLOCKCACHE_H_ */

测试程序:

#include <iostream>
#include <fstream>
#include <mutex>
#include <map>
#include <time.h>
#include <math.h>
#include "LruClockCache.h"
int main()
{

  // mandelbrot generation + (softening X10) using a cache

  using namespace std;

  std::map < int, int > map;

  int imageWidth, imageHeight, maxN;
  double minR, maxR, minI, maxI;

  imageWidth = 1024;
  imageHeight = 1024;
  maxN = 512;
  minR = -1.5;
  maxR = 0.7;
  minI = -1.0;
  maxI = 1.0;

  size_t readmiss = 0;
  size_t writemiss = 0;
  size_t read = 0;
  size_t write = 0;
  LruClockCache < int, int> cache(1024*1024+1000,
    [ & ](int key) {
      readmiss++;
      return map[key];
    },
    [ & ](int key, int value) {
      writemiss++;
      map[key] = value;
    });

  ofstream g("output_image.ppm");
  g << "P6" << endl;
  g << imageWidth << " " << imageHeight << endl;
  g << "255" << endl;

  double start = clock();
  double t = 0;

  for (int i = 0; i < imageHeight; i++) {
    for (int j = 0; j < imageWidth; j++) {
      double cr = mapToReal(j, imageWidth, minR, maxR);
      double ci = mapToImaginary(i, imageHeight, minI, maxI);

      cache.set(i + j * imageWidth, findMandelbrot(cr, ci, maxN));

      read++;
    }
  }

  // image softening (may not be accurate)
  for (int k = 0; k < 50; k++) {
    for (int i = 1; i < imageHeight - 1; i++) {
      for (int j = 1; j < imageWidth - 1; j++) {


        const int && n0 = cache.get(i + j * imageWidth);
        const int && n1 = cache.get(i + 1 + j * imageWidth);
        const int && n2 = cache.get(i - 1 + j * imageWidth);
        const int && n3 = cache.get(i + (j + 1) * imageWidth);
        const int && n4 = cache.get(i + (j - 1) * imageWidth);
        const int && n = (n0 + n1 + n2 + n3 + n4) / 5;
        cache.set(i + j * imageWidth, n);

        read += 5;
        write++;
      }
    }

  }

  for (int i = 0; i < imageHeight; i++) {
    for (int j = 0; j < imageWidth; j++) {

      int n = cache.get(i + j * imageWidth);

      int r = ((int) sqrt(n) % 256);
      int gr = (2 * n % 256);
      int b = (n % 256);
      write++;
      g << (char) r << (char) gr << (char) b;
    }

  }
  cout << "Finished!" << endl;

  double stop = clock();

  cout << (stop - start) / CLOCKS_PER_SEC;

  cache.flush();

  g.flush();

  cout << endl << t << endl;
  std::cout << (read - readmiss) / (double) read << std::endl;
  std::cout << (write - writemiss) / (double) write << std::endl;
  return 0;
}

编译器设置有-O3。

这不会在 raxrdx 之间移动。

这是通过 rdxrax 指向的数组进行索引,并将结果放入 rax。可能的 L1 缓存未命中。