为什么在两个 CPU 寄存器之间移动数据如此缓慢以致于花费了总时间的 30%?
Why is moving data between two CPU registers so slow that it costs 30% of total time?
在尝试分析和优化缓存算法时,我遇到了一些我不理解的问题。
这是 perf 报告的热点(在 Ubuntu 18.04 LTS 和 g++ 7.5 中):
仅仅 rax 和 rdx 寄存器之间的一个“mov”操作如何导致约 30% 的总 运行 程序时间?它不是一个寄存器繁重的程序(一种 LRU 缓存近似算法,每秒最多进行约 5000 万次查找,这大约是 400MB/s 的吞吐量(对于更大的键值对,肯定不会比 RAM 快),这不应该是与寄存器带宽完全相关)
测试系统的 CPU 是 FX8150,具有 1 通道内存和 9GB/s 的带宽,这比这个单线程缓存仅“int”键 + “int”值对所能达到的要高得多。所以我想将 RAM 排除在这个问题之外应该是安全的。此外,mov 指令看起来像是 std::unordered_map 查找操作的一部分。我知道这个 CPU 很旧但不是真的很古老所以编译器可能由于某些“支持旧 CPU” 问题而没有使用正确的指令吗?
重现热点的源代码:
#ifndef LRUCLOCKCACHE_H_
#define LRUCLOCKCACHE_H_
#include<vector>
#include<algorithm>
#include<unordered_map>
#include<functional>
#include<mutex>
#include<unordered_map>
/* LRU-CLOCK-second-chance implementation
*
* LruKey: type of key (std::string, int, char, size_t, objects)
* LruValue: type of value that is bound to key (same as above)
* ClockHandInteger: just an optional optimization to reduce memory consumption when cache size is equal to or less than 255,65535,4B-1,...
*/
template< typename LruKey, typename LruValue,typename ClockHandInteger=size_t>
class LruClockCache
{
public:
// allocates circular buffers for numElements number of cache slots
// readMiss: cache-miss for read operations. User needs to give this function
// to let the cache automatically get data from backing-store
// example: [&](MyClass key){ return redis.get(key); }
// takes a LruKey as key, returns LruValue as value
// writeMiss: cache-miss for write operations. User needs to give this function
// to let the cache automatically set data to backing-store
// example: [&](MyClass key, MyAnotherClass value){ redis.set(key,value); }
// takes a LruKey as key and LruValue as value
LruClockCache(ClockHandInteger numElements,
const std::function<LruValue(LruKey)> & readMiss,
const std::function<void(LruKey,LruValue)> & writeMiss):size(numElements),loadData(readMiss),saveData(writeMiss)
{
ctr = 0;
// 50% phase difference between eviction and second-chance hands of the "second-chance" CLOCK algorithm
ctrEvict = numElements/2;
//loadData=readMiss;
//saveData=writeMiss;
// initialize circular buffers
for(ClockHandInteger i=0;i<numElements;i++)
{
valueBuffer.push_back(LruValue());
chanceToSurviveBuffer.push_back(0);
isEditedBuffer.push_back(0);
keyBuffer.push_back(LruKey());
}
}
// get element from cache
// if cache doesn't find it in circular buffers,
// then cache gets data from backing-store
// then returns the result to user
// then cache is available from RAM on next get/set access with same key
inline
const LruValue get(const LruKey & key) noexcept
{
return accessClock2Hand(key,nullptr);
}
// only syntactic difference
inline
const std::vector<LruValue> getMultiple(const std::vector<LruKey> & key) noexcept
{
const int n = key.size();
std::vector<LruValue> result(n);
for(int i=0;i<n;i++)
{
result[i]=accessClock2Hand(key[i],nullptr);
}
return result;
}
// thread-safe but slower version of get()
inline
const LruValue getThreadSafe(const LruKey & key) noexcept
{
std::lock_guard<std::mutex> lg(mut);
return accessClock2Hand(key,nullptr);
}
// set element to cache
// if cache doesn't find it in circular buffers,
// then cache sets data on just cache
// writing to backing-store only happens when
// another access evicts the cache slot containing this key/value
// or when cache is flushed by flush() method
// then returns the given value back
// then cache is available from RAM on next get/set access with same key
inline
void set(const LruKey & key, const LruValue & val) noexcept
{
accessClock2Hand(key,&val,1);
}
// thread-safe but slower version of set()
inline
void setThreadSafe(const LruKey & key, const LruValue & val) noexcept
{
std::lock_guard<std::mutex> lg(mut);
accessClock2Hand(key,&val,1);
}
// use this before closing the backing-store to store the latest bits of data
void flush()
{
for (auto mp = mapping.cbegin(); mp != mapping.cend() /* not hoisted */; /* no increment */)
{
if (isEditedBuffer[mp->second] == 1)
{
isEditedBuffer[mp->second]=0;
auto oldKey = keyBuffer[mp->second];
auto oldValue = valueBuffer[mp->second];
saveData(oldKey,oldValue);
mapping.erase(mp++); // or "it = m.erase(it)" since C++11
}
else
{
++mp;
}
}
}
// CLOCK algorithm with 2 hand counters (1 for second chance for a cache slot to survive, 1 for eviction of cache slot)
// opType=0: get
// opType=1: set
LruValue const accessClock2Hand(const LruKey & key,const LruValue * value, const bool opType = 0)
{
// check if it is a cache-hit (in-cache)
typename std::unordered_map<LruKey,ClockHandInteger>::iterator it = mapping.find(key);
if(it!=mapping.end())
{
chanceToSurviveBuffer[it->second]=1;
if(opType == 1)
{
isEditedBuffer[it->second]=1;
valueBuffer[it->second]=*value;
}
return valueBuffer[it->second];
}
else // could not found key in cache, so searching in circular-buffer starts
{
long long ctrFound = -1;
LruValue oldValue;
LruKey oldKey;
while(ctrFound==-1)
{
// eviction hand lowers the "chance" status down if its 1 but slot is saved from eviction
if(chanceToSurviveBuffer[ctr]>0)
{
chanceToSurviveBuffer[ctr]=0;
}
// circular buffer has no bounds
ctr++;
if(ctr>=size)
{
ctr=0;
}
// unlucky slot is selected for eviction
if(chanceToSurviveBuffer[ctrEvict]==0)
{
ctrFound=ctrEvict;
oldValue = valueBuffer[ctrFound];
oldKey = keyBuffer[ctrFound];
}
// circular buffer has no bounds
ctrEvict++;
if(ctrEvict>=size)
{
ctrEvict=0;
}
}
// eviction algorithm start
if(isEditedBuffer[ctrFound] == 1)
{
// if it is "get"
if(opType==0)
{
isEditedBuffer[ctrFound]=0;
}
saveData(oldKey,oldValue);
// "get"
if(opType==0)
{
const LruValue && loadedData = loadData(key);
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=loadedData;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return loadedData;
}
else /* "set" */
{
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=*value;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return *value;
}
}
else // not edited
{
// "set"
if(opType == 1)
{
isEditedBuffer[ctrFound]=1;
}
// "get"
if(opType == 0)
{
const LruValue && loadedData = loadData(key);
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=loadedData;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return loadedData;
}
else // "set"
{
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=*value;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return *value;
}
}
}
}
private:
ClockHandInteger size;
std::mutex mut;
std::unordered_map<LruKey,ClockHandInteger> mapping;
std::vector<LruValue> valueBuffer;
std::vector<unsigned char> chanceToSurviveBuffer;
std::vector<unsigned char> isEditedBuffer;
std::vector<LruKey> keyBuffer;
const std::function<LruValue(LruKey)> loadData;
const std::function<void(LruKey,LruValue)> saveData;
ClockHandInteger ctr;
ClockHandInteger ctrEvict;
};
#endif /* LRUCLOCKCACHE_H_ */
测试程序:
#include <iostream>
#include <fstream>
#include <mutex>
#include <map>
#include <time.h>
#include <math.h>
#include "LruClockCache.h"
int main()
{
// mandelbrot generation + (softening X10) using a cache
using namespace std;
std::map < int, int > map;
int imageWidth, imageHeight, maxN;
double minR, maxR, minI, maxI;
imageWidth = 1024;
imageHeight = 1024;
maxN = 512;
minR = -1.5;
maxR = 0.7;
minI = -1.0;
maxI = 1.0;
size_t readmiss = 0;
size_t writemiss = 0;
size_t read = 0;
size_t write = 0;
LruClockCache < int, int> cache(1024*1024+1000,
[ & ](int key) {
readmiss++;
return map[key];
},
[ & ](int key, int value) {
writemiss++;
map[key] = value;
});
ofstream g("output_image.ppm");
g << "P6" << endl;
g << imageWidth << " " << imageHeight << endl;
g << "255" << endl;
double start = clock();
double t = 0;
for (int i = 0; i < imageHeight; i++) {
for (int j = 0; j < imageWidth; j++) {
double cr = mapToReal(j, imageWidth, minR, maxR);
double ci = mapToImaginary(i, imageHeight, minI, maxI);
cache.set(i + j * imageWidth, findMandelbrot(cr, ci, maxN));
read++;
}
}
// image softening (may not be accurate)
for (int k = 0; k < 50; k++) {
for (int i = 1; i < imageHeight - 1; i++) {
for (int j = 1; j < imageWidth - 1; j++) {
const int && n0 = cache.get(i + j * imageWidth);
const int && n1 = cache.get(i + 1 + j * imageWidth);
const int && n2 = cache.get(i - 1 + j * imageWidth);
const int && n3 = cache.get(i + (j + 1) * imageWidth);
const int && n4 = cache.get(i + (j - 1) * imageWidth);
const int && n = (n0 + n1 + n2 + n3 + n4) / 5;
cache.set(i + j * imageWidth, n);
read += 5;
write++;
}
}
}
for (int i = 0; i < imageHeight; i++) {
for (int j = 0; j < imageWidth; j++) {
int n = cache.get(i + j * imageWidth);
int r = ((int) sqrt(n) % 256);
int gr = (2 * n % 256);
int b = (n % 256);
write++;
g << (char) r << (char) gr << (char) b;
}
}
cout << "Finished!" << endl;
double stop = clock();
cout << (stop - start) / CLOCKS_PER_SEC;
cache.flush();
g.flush();
cout << endl << t << endl;
std::cout << (read - readmiss) / (double) read << std::endl;
std::cout << (write - writemiss) / (double) write << std::endl;
return 0;
}
编译器设置有-O3。
这不会在 rax
和 rdx
之间移动。
这是通过 rdx
对 rax
指向的数组进行索引,并将结果放入 rax
。可能的 L1 缓存未命中。
在尝试分析和优化缓存算法时,我遇到了一些我不理解的问题。
这是 perf 报告的热点(在 Ubuntu 18.04 LTS 和 g++ 7.5 中):
仅仅 rax 和 rdx 寄存器之间的一个“mov”操作如何导致约 30% 的总 运行 程序时间?它不是一个寄存器繁重的程序(一种 LRU 缓存近似算法,每秒最多进行约 5000 万次查找,这大约是 400MB/s 的吞吐量(对于更大的键值对,肯定不会比 RAM 快),这不应该是与寄存器带宽完全相关)
测试系统的 CPU 是 FX8150,具有 1 通道内存和 9GB/s 的带宽,这比这个单线程缓存仅“int”键 + “int”值对所能达到的要高得多。所以我想将 RAM 排除在这个问题之外应该是安全的。此外,mov 指令看起来像是 std::unordered_map 查找操作的一部分。我知道这个 CPU 很旧但不是真的很古老所以编译器可能由于某些“支持旧 CPU” 问题而没有使用正确的指令吗?
重现热点的源代码:
#ifndef LRUCLOCKCACHE_H_
#define LRUCLOCKCACHE_H_
#include<vector>
#include<algorithm>
#include<unordered_map>
#include<functional>
#include<mutex>
#include<unordered_map>
/* LRU-CLOCK-second-chance implementation
*
* LruKey: type of key (std::string, int, char, size_t, objects)
* LruValue: type of value that is bound to key (same as above)
* ClockHandInteger: just an optional optimization to reduce memory consumption when cache size is equal to or less than 255,65535,4B-1,...
*/
template< typename LruKey, typename LruValue,typename ClockHandInteger=size_t>
class LruClockCache
{
public:
// allocates circular buffers for numElements number of cache slots
// readMiss: cache-miss for read operations. User needs to give this function
// to let the cache automatically get data from backing-store
// example: [&](MyClass key){ return redis.get(key); }
// takes a LruKey as key, returns LruValue as value
// writeMiss: cache-miss for write operations. User needs to give this function
// to let the cache automatically set data to backing-store
// example: [&](MyClass key, MyAnotherClass value){ redis.set(key,value); }
// takes a LruKey as key and LruValue as value
LruClockCache(ClockHandInteger numElements,
const std::function<LruValue(LruKey)> & readMiss,
const std::function<void(LruKey,LruValue)> & writeMiss):size(numElements),loadData(readMiss),saveData(writeMiss)
{
ctr = 0;
// 50% phase difference between eviction and second-chance hands of the "second-chance" CLOCK algorithm
ctrEvict = numElements/2;
//loadData=readMiss;
//saveData=writeMiss;
// initialize circular buffers
for(ClockHandInteger i=0;i<numElements;i++)
{
valueBuffer.push_back(LruValue());
chanceToSurviveBuffer.push_back(0);
isEditedBuffer.push_back(0);
keyBuffer.push_back(LruKey());
}
}
// get element from cache
// if cache doesn't find it in circular buffers,
// then cache gets data from backing-store
// then returns the result to user
// then cache is available from RAM on next get/set access with same key
inline
const LruValue get(const LruKey & key) noexcept
{
return accessClock2Hand(key,nullptr);
}
// only syntactic difference
inline
const std::vector<LruValue> getMultiple(const std::vector<LruKey> & key) noexcept
{
const int n = key.size();
std::vector<LruValue> result(n);
for(int i=0;i<n;i++)
{
result[i]=accessClock2Hand(key[i],nullptr);
}
return result;
}
// thread-safe but slower version of get()
inline
const LruValue getThreadSafe(const LruKey & key) noexcept
{
std::lock_guard<std::mutex> lg(mut);
return accessClock2Hand(key,nullptr);
}
// set element to cache
// if cache doesn't find it in circular buffers,
// then cache sets data on just cache
// writing to backing-store only happens when
// another access evicts the cache slot containing this key/value
// or when cache is flushed by flush() method
// then returns the given value back
// then cache is available from RAM on next get/set access with same key
inline
void set(const LruKey & key, const LruValue & val) noexcept
{
accessClock2Hand(key,&val,1);
}
// thread-safe but slower version of set()
inline
void setThreadSafe(const LruKey & key, const LruValue & val) noexcept
{
std::lock_guard<std::mutex> lg(mut);
accessClock2Hand(key,&val,1);
}
// use this before closing the backing-store to store the latest bits of data
void flush()
{
for (auto mp = mapping.cbegin(); mp != mapping.cend() /* not hoisted */; /* no increment */)
{
if (isEditedBuffer[mp->second] == 1)
{
isEditedBuffer[mp->second]=0;
auto oldKey = keyBuffer[mp->second];
auto oldValue = valueBuffer[mp->second];
saveData(oldKey,oldValue);
mapping.erase(mp++); // or "it = m.erase(it)" since C++11
}
else
{
++mp;
}
}
}
// CLOCK algorithm with 2 hand counters (1 for second chance for a cache slot to survive, 1 for eviction of cache slot)
// opType=0: get
// opType=1: set
LruValue const accessClock2Hand(const LruKey & key,const LruValue * value, const bool opType = 0)
{
// check if it is a cache-hit (in-cache)
typename std::unordered_map<LruKey,ClockHandInteger>::iterator it = mapping.find(key);
if(it!=mapping.end())
{
chanceToSurviveBuffer[it->second]=1;
if(opType == 1)
{
isEditedBuffer[it->second]=1;
valueBuffer[it->second]=*value;
}
return valueBuffer[it->second];
}
else // could not found key in cache, so searching in circular-buffer starts
{
long long ctrFound = -1;
LruValue oldValue;
LruKey oldKey;
while(ctrFound==-1)
{
// eviction hand lowers the "chance" status down if its 1 but slot is saved from eviction
if(chanceToSurviveBuffer[ctr]>0)
{
chanceToSurviveBuffer[ctr]=0;
}
// circular buffer has no bounds
ctr++;
if(ctr>=size)
{
ctr=0;
}
// unlucky slot is selected for eviction
if(chanceToSurviveBuffer[ctrEvict]==0)
{
ctrFound=ctrEvict;
oldValue = valueBuffer[ctrFound];
oldKey = keyBuffer[ctrFound];
}
// circular buffer has no bounds
ctrEvict++;
if(ctrEvict>=size)
{
ctrEvict=0;
}
}
// eviction algorithm start
if(isEditedBuffer[ctrFound] == 1)
{
// if it is "get"
if(opType==0)
{
isEditedBuffer[ctrFound]=0;
}
saveData(oldKey,oldValue);
// "get"
if(opType==0)
{
const LruValue && loadedData = loadData(key);
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=loadedData;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return loadedData;
}
else /* "set" */
{
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=*value;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return *value;
}
}
else // not edited
{
// "set"
if(opType == 1)
{
isEditedBuffer[ctrFound]=1;
}
// "get"
if(opType == 0)
{
const LruValue && loadedData = loadData(key);
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=loadedData;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return loadedData;
}
else // "set"
{
mapping.erase(keyBuffer[ctrFound]);
valueBuffer[ctrFound]=*value;
chanceToSurviveBuffer[ctrFound]=0;
mapping.emplace(key,ctrFound);
keyBuffer[ctrFound]=key;
return *value;
}
}
}
}
private:
ClockHandInteger size;
std::mutex mut;
std::unordered_map<LruKey,ClockHandInteger> mapping;
std::vector<LruValue> valueBuffer;
std::vector<unsigned char> chanceToSurviveBuffer;
std::vector<unsigned char> isEditedBuffer;
std::vector<LruKey> keyBuffer;
const std::function<LruValue(LruKey)> loadData;
const std::function<void(LruKey,LruValue)> saveData;
ClockHandInteger ctr;
ClockHandInteger ctrEvict;
};
#endif /* LRUCLOCKCACHE_H_ */
测试程序:
#include <iostream>
#include <fstream>
#include <mutex>
#include <map>
#include <time.h>
#include <math.h>
#include "LruClockCache.h"
int main()
{
// mandelbrot generation + (softening X10) using a cache
using namespace std;
std::map < int, int > map;
int imageWidth, imageHeight, maxN;
double minR, maxR, minI, maxI;
imageWidth = 1024;
imageHeight = 1024;
maxN = 512;
minR = -1.5;
maxR = 0.7;
minI = -1.0;
maxI = 1.0;
size_t readmiss = 0;
size_t writemiss = 0;
size_t read = 0;
size_t write = 0;
LruClockCache < int, int> cache(1024*1024+1000,
[ & ](int key) {
readmiss++;
return map[key];
},
[ & ](int key, int value) {
writemiss++;
map[key] = value;
});
ofstream g("output_image.ppm");
g << "P6" << endl;
g << imageWidth << " " << imageHeight << endl;
g << "255" << endl;
double start = clock();
double t = 0;
for (int i = 0; i < imageHeight; i++) {
for (int j = 0; j < imageWidth; j++) {
double cr = mapToReal(j, imageWidth, minR, maxR);
double ci = mapToImaginary(i, imageHeight, minI, maxI);
cache.set(i + j * imageWidth, findMandelbrot(cr, ci, maxN));
read++;
}
}
// image softening (may not be accurate)
for (int k = 0; k < 50; k++) {
for (int i = 1; i < imageHeight - 1; i++) {
for (int j = 1; j < imageWidth - 1; j++) {
const int && n0 = cache.get(i + j * imageWidth);
const int && n1 = cache.get(i + 1 + j * imageWidth);
const int && n2 = cache.get(i - 1 + j * imageWidth);
const int && n3 = cache.get(i + (j + 1) * imageWidth);
const int && n4 = cache.get(i + (j - 1) * imageWidth);
const int && n = (n0 + n1 + n2 + n3 + n4) / 5;
cache.set(i + j * imageWidth, n);
read += 5;
write++;
}
}
}
for (int i = 0; i < imageHeight; i++) {
for (int j = 0; j < imageWidth; j++) {
int n = cache.get(i + j * imageWidth);
int r = ((int) sqrt(n) % 256);
int gr = (2 * n % 256);
int b = (n % 256);
write++;
g << (char) r << (char) gr << (char) b;
}
}
cout << "Finished!" << endl;
double stop = clock();
cout << (stop - start) / CLOCKS_PER_SEC;
cache.flush();
g.flush();
cout << endl << t << endl;
std::cout << (read - readmiss) / (double) read << std::endl;
std::cout << (write - writemiss) / (double) write << std::endl;
return 0;
}
编译器设置有-O3。
这不会在 rax
和 rdx
之间移动。
这是通过 rdx
对 rax
指向的数组进行索引,并将结果放入 rax
。可能的 L1 缓存未命中。