C++ google 多线程测试失败
C++ google test fail for multi threads
我在 google 测试中有以下 C++ 代码和相应的单元测试。我正在学习使用测试驱动开发进行现代 C++ 编程的书。以下代码在规模测试中崩溃。我分析的问题是代码的更新功能部分,如果我如下所示评论锁定互斥锁和 notify_all,并且将用户数量减少到 50 个而不是 5000 个,则测试用例不会崩溃。另一个观察相同的代码正在另一个测试用例“HandlesLargeNumbersOfUsers”中使用单线程或多线程。您的意见将帮助我走得更远。
void updated(const User& user) override {
// unique_lock<std::mutex> lock(mutex_);
Count++;
// wasExecuted_.notify_all();
}
class GeoServerUsersInBoxTests : public testing::Test {
public:
GeoServer server;
const double TenMeters{ 10 };
const double Width{ 2000 + TenMeters };
const double Height{ 4000 + TenMeters };
const string aUser{ "auser" };
const string bUser{ "buser" };
const string cUser{ "cuser" };
Location aUserLocation{ 38, -103 };
shared_ptr<ThreadPool> pool;
virtual void SetUp() override {
server.useThreadPool(pool);
server.track(aUser);
server.track(bUser);
server.track(cUser);
server.updateLocation(aUser, aUserLocation);
}
string userName(unsigned int i) {
return string{ "user" + to_string(i) };
}
void addUsersAt(unsigned int number, const Location& location) {
for (unsigned int i{ 0 }; i < number; i++) {
string user = userName(i);
server.track(user);
server.updateLocation(user, location);
}
}
};
class AGeoServer_ScaleTests : public GeoServerUsersInBoxTests {
public:
class GeoServerCountingListener : public GeoServerListener {
public:
void updated(const User& user) override {
unique_lock<std::mutex> lock(mutex_);
Count++;
wasExecuted_.notify_all();
}
void waitForCountAndFailOnTimeout(unsigned int expectedCount,
const milliseconds& time = milliseconds(10000)) {
unique_lock<mutex> lock(mutex_);
ASSERT_TRUE(wasExecuted_.wait_for(lock, time, [&]
{ return expectedCount == Count; }));
}
condition_variable wasExecuted_;
unsigned int Count{ 0 };
mutex mutex_;
};
GeoServerCountingListener countingListener;
shared_ptr<thread> t;
void SetUp() override {
pool = make_shared<ThreadPool>();
GeoServerUsersInBoxTests::SetUp();
}
void TearDown() override {
t->join();
}
};
TEST_F(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
pool->start(4);
const unsigned int lots{ 5000 };
addUsersAt(lots, Location{ aUserLocation.go(TenMeters, West) });
t = make_shared<thread>(
[&] { server.usersInBox(aUser, Width, Height, &countingListener); });
countingListener.waitForCountAndFailOnTimeout(lots);
}
ThreadPool.h
class ThreadPool {
public:
virtual ~ThreadPool() {
stop();
}
void stop() {
done_ = true;
for (auto& thread : threads_) thread.join();
}
void start(unsigned int numberOfThreads = 1) {
for (unsigned int i{ 0u }; i < numberOfThreads; i++)
threads_.push_back(std::thread(&ThreadPool::worker, this));
}
bool hasWork() {
std::lock_guard<std::mutex> block(mutex_);
return !workQueue_.empty();
}
virtual void add(Work work) {
std::lock_guard<std::mutex> block(mutex_);
workQueue_.push_front(work);
}
Work pullWork() {
std::lock_guard<std::mutex> block(mutex_);
if (workQueue_.empty()) return Work{};
auto work = workQueue_.back();
workQueue_.pop_back();
return work;
}
private:
void worker() {
while (!done_) {
while (!done_ && !hasWork())
;
if (done_) break;
pullWork().execute();
}
}
std::atomic<bool> done_{ false };
std::deque<Work> workQueue_;
std::shared_ptr<std::thread> workThread_;
std::mutex mutex_;
std::vector<std::thread> threads_;
};
GeoServer.cpp
void GeoServer::track(const string& user) {
locations_[user] = Location();
}
void GeoServer::stopTracking(const string& user) {
locations_.erase(user);
}
bool GeoServer::isTracking(const string& user) const {
return find(user) != locations_.end();
}
void GeoServer::updateLocation(const string& user, const Location& location) {
locations_[user] = location;
}
Location GeoServer::locationOf(const string& user) const {
if (!isTracking(user)) return Location{}; // TODO performance cost?
return find(user)->second;
}
std::unordered_map<std::string, Location>::const_iterator
GeoServer::find(const std::string& user) const {
return locations_.find(user);
}
bool GeoServer::isDifferentUserInBounds(
const pair<string, Location>& each,
const string& user,
const Area& box) const {
if (each.first == user) return false;
return box.inBounds(each.second);
}
void GeoServer::usersInBox(
const string& user, double widthInMeters, double heightInMeters,
GeoServerListener* listener) const {
auto location = locations_.find(user)->second;
Area box{ location, widthInMeters, heightInMeters };
for (auto& each : locations_) {
Work work{ [&] {
if (isDifferentUserInBounds(each, user, box))
listener->updated(User{each.first, each.second});
} };
pool_->add(work);
}
}
让我们看一下这段代码:
void GeoServer::usersInBox(
const string& user, double widthInMeters, double heightInMeters, // #4
GeoServerListener* listener) const { // #5
auto location = locations_.find(user)->second;
Area box{ location, widthInMeters, heightInMeters }; // #3
for (auto& each : locations_) {
Work work{ [&] { // #1
if (isDifferentUserInBounds(each, user, box))
listener->updated(User{each.first, each.second});
} };
pool_->add(work); // #2
}
}
第 1 行创建一个工作项。这项工作是一个具有通用引用捕获的 lambda 函数(如有必要,请查找)。 lambda 函数从外部作用域引用了四个东西:each
、user
、box
和 listener
.
第 2 行安排池中的工作项。然后继续执行;该工作将在未来某个时间执行。
当for循环完成后,函数returns。现在事情变得非常不对劲。
捕获的 box
是对在#3 处定义的局部变量的引用。这个局部变量消失了; lambda 中的引用现在是悬空的,访问它是未定义的行为。
捕获的 listener
是对在#5 处定义的函数参数的引用。这个论点消失了;参考现在悬空。你可能会想,“哦,但那是一个指针,没关系!”不是。您不是在捕获指针,而是在捕获指针的 reference。
捕获的user
本身已经是一个引用,定义在#4。虽然争论消失了,但潜在的东西还没有。在您的测试中,碰巧参数引用了 aUser
,它是测试 class 的成员,因此一直存在。但它可能是一个临时对象,一旦方法 returns 就被销毁,使捕获的引用悬空。
经验法则:不要对 lambda 使用 &
,因为您不能保证它会在您离开 lambda 的周围范围之前执行。
我在 google 测试中有以下 C++ 代码和相应的单元测试。我正在学习使用测试驱动开发进行现代 C++ 编程的书。以下代码在规模测试中崩溃。我分析的问题是代码的更新功能部分,如果我如下所示评论锁定互斥锁和 notify_all,并且将用户数量减少到 50 个而不是 5000 个,则测试用例不会崩溃。另一个观察相同的代码正在另一个测试用例“HandlesLargeNumbersOfUsers”中使用单线程或多线程。您的意见将帮助我走得更远。
void updated(const User& user) override {
// unique_lock<std::mutex> lock(mutex_);
Count++;
// wasExecuted_.notify_all();
}
class GeoServerUsersInBoxTests : public testing::Test {
public:
GeoServer server;
const double TenMeters{ 10 };
const double Width{ 2000 + TenMeters };
const double Height{ 4000 + TenMeters };
const string aUser{ "auser" };
const string bUser{ "buser" };
const string cUser{ "cuser" };
Location aUserLocation{ 38, -103 };
shared_ptr<ThreadPool> pool;
virtual void SetUp() override {
server.useThreadPool(pool);
server.track(aUser);
server.track(bUser);
server.track(cUser);
server.updateLocation(aUser, aUserLocation);
}
string userName(unsigned int i) {
return string{ "user" + to_string(i) };
}
void addUsersAt(unsigned int number, const Location& location) {
for (unsigned int i{ 0 }; i < number; i++) {
string user = userName(i);
server.track(user);
server.updateLocation(user, location);
}
}
};
class AGeoServer_ScaleTests : public GeoServerUsersInBoxTests {
public:
class GeoServerCountingListener : public GeoServerListener {
public:
void updated(const User& user) override {
unique_lock<std::mutex> lock(mutex_);
Count++;
wasExecuted_.notify_all();
}
void waitForCountAndFailOnTimeout(unsigned int expectedCount,
const milliseconds& time = milliseconds(10000)) {
unique_lock<mutex> lock(mutex_);
ASSERT_TRUE(wasExecuted_.wait_for(lock, time, [&]
{ return expectedCount == Count; }));
}
condition_variable wasExecuted_;
unsigned int Count{ 0 };
mutex mutex_;
};
GeoServerCountingListener countingListener;
shared_ptr<thread> t;
void SetUp() override {
pool = make_shared<ThreadPool>();
GeoServerUsersInBoxTests::SetUp();
}
void TearDown() override {
t->join();
}
};
TEST_F(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
pool->start(4);
const unsigned int lots{ 5000 };
addUsersAt(lots, Location{ aUserLocation.go(TenMeters, West) });
t = make_shared<thread>(
[&] { server.usersInBox(aUser, Width, Height, &countingListener); });
countingListener.waitForCountAndFailOnTimeout(lots);
}
ThreadPool.h
class ThreadPool {
public:
virtual ~ThreadPool() {
stop();
}
void stop() {
done_ = true;
for (auto& thread : threads_) thread.join();
}
void start(unsigned int numberOfThreads = 1) {
for (unsigned int i{ 0u }; i < numberOfThreads; i++)
threads_.push_back(std::thread(&ThreadPool::worker, this));
}
bool hasWork() {
std::lock_guard<std::mutex> block(mutex_);
return !workQueue_.empty();
}
virtual void add(Work work) {
std::lock_guard<std::mutex> block(mutex_);
workQueue_.push_front(work);
}
Work pullWork() {
std::lock_guard<std::mutex> block(mutex_);
if (workQueue_.empty()) return Work{};
auto work = workQueue_.back();
workQueue_.pop_back();
return work;
}
private:
void worker() {
while (!done_) {
while (!done_ && !hasWork())
;
if (done_) break;
pullWork().execute();
}
}
std::atomic<bool> done_{ false };
std::deque<Work> workQueue_;
std::shared_ptr<std::thread> workThread_;
std::mutex mutex_;
std::vector<std::thread> threads_;
};
GeoServer.cpp
void GeoServer::track(const string& user) {
locations_[user] = Location();
}
void GeoServer::stopTracking(const string& user) {
locations_.erase(user);
}
bool GeoServer::isTracking(const string& user) const {
return find(user) != locations_.end();
}
void GeoServer::updateLocation(const string& user, const Location& location) {
locations_[user] = location;
}
Location GeoServer::locationOf(const string& user) const {
if (!isTracking(user)) return Location{}; // TODO performance cost?
return find(user)->second;
}
std::unordered_map<std::string, Location>::const_iterator
GeoServer::find(const std::string& user) const {
return locations_.find(user);
}
bool GeoServer::isDifferentUserInBounds(
const pair<string, Location>& each,
const string& user,
const Area& box) const {
if (each.first == user) return false;
return box.inBounds(each.second);
}
void GeoServer::usersInBox(
const string& user, double widthInMeters, double heightInMeters,
GeoServerListener* listener) const {
auto location = locations_.find(user)->second;
Area box{ location, widthInMeters, heightInMeters };
for (auto& each : locations_) {
Work work{ [&] {
if (isDifferentUserInBounds(each, user, box))
listener->updated(User{each.first, each.second});
} };
pool_->add(work);
}
}
让我们看一下这段代码:
void GeoServer::usersInBox(
const string& user, double widthInMeters, double heightInMeters, // #4
GeoServerListener* listener) const { // #5
auto location = locations_.find(user)->second;
Area box{ location, widthInMeters, heightInMeters }; // #3
for (auto& each : locations_) {
Work work{ [&] { // #1
if (isDifferentUserInBounds(each, user, box))
listener->updated(User{each.first, each.second});
} };
pool_->add(work); // #2
}
}
第 1 行创建一个工作项。这项工作是一个具有通用引用捕获的 lambda 函数(如有必要,请查找)。 lambda 函数从外部作用域引用了四个东西:each
、user
、box
和 listener
.
第 2 行安排池中的工作项。然后继续执行;该工作将在未来某个时间执行。
当for循环完成后,函数returns。现在事情变得非常不对劲。
捕获的 box
是对在#3 处定义的局部变量的引用。这个局部变量消失了; lambda 中的引用现在是悬空的,访问它是未定义的行为。
捕获的 listener
是对在#5 处定义的函数参数的引用。这个论点消失了;参考现在悬空。你可能会想,“哦,但那是一个指针,没关系!”不是。您不是在捕获指针,而是在捕获指针的 reference。
捕获的user
本身已经是一个引用,定义在#4。虽然争论消失了,但潜在的东西还没有。在您的测试中,碰巧参数引用了 aUser
,它是测试 class 的成员,因此一直存在。但它可能是一个临时对象,一旦方法 returns 就被销毁,使捕获的引用悬空。
经验法则:不要对 lambda 使用 &
,因为您不能保证它会在您离开 lambda 的周围范围之前执行。