C++ google 多线程测试失败

C++ google test fail for multi threads

我在 google 测试中有以下 C++ 代码和相应的单元测试。我正在学习使用测试驱动开发进行现代 C++ 编程的书。以下代码在规模测试中崩溃。我分析的问题是代码的更新功能部分,如果我如下所示评论锁定互斥锁和 notify_all,并且将用户数量减少到 50 个而不是 5000 个,则测试用例不会崩溃。另一个观察相同的代码正在另一个测试用例“HandlesLargeNumbersOfUsers”中使用单线程或多线程。您的意见将帮助我走得更远。

void updated(const User& user) override {
        // unique_lock<std::mutex> lock(mutex_);
        Count++;
        // wasExecuted_.notify_all();
    }

class GeoServerUsersInBoxTests : public testing::Test {
public:
    GeoServer server;

    const double TenMeters{ 10 };
    const double Width{ 2000 + TenMeters };
    const double Height{ 4000 + TenMeters };
    const string aUser{ "auser" };
    const string bUser{ "buser" };
    const string cUser{ "cuser" };

    Location aUserLocation{ 38, -103 };

    shared_ptr<ThreadPool> pool;

    virtual void SetUp() override {
        server.useThreadPool(pool);

        server.track(aUser);
        server.track(bUser);
        server.track(cUser);

        server.updateLocation(aUser, aUserLocation);
    }

    string userName(unsigned int i) {
        return string{ "user" + to_string(i) };
    }

    void addUsersAt(unsigned int number, const Location& location) {
        for (unsigned int i{ 0 }; i < number; i++) {
            string user = userName(i);
            server.track(user);
            server.updateLocation(user, location);
        }
    }
};

class AGeoServer_ScaleTests : public GeoServerUsersInBoxTests {

public:

    class GeoServerCountingListener : public GeoServerListener {
    public:
        void updated(const User& user) override {
            unique_lock<std::mutex> lock(mutex_);
            Count++;
            wasExecuted_.notify_all();
        }

        void waitForCountAndFailOnTimeout(unsigned int expectedCount,
            const milliseconds& time = milliseconds(10000)) {
            unique_lock<mutex> lock(mutex_);
            ASSERT_TRUE(wasExecuted_.wait_for(lock, time, [&]
                { return expectedCount == Count; }));
        }

        condition_variable wasExecuted_;
        unsigned int Count{ 0 };
        mutex mutex_;
    };

    GeoServerCountingListener countingListener;
    shared_ptr<thread> t;

    void SetUp() override {
        pool = make_shared<ThreadPool>();
        GeoServerUsersInBoxTests::SetUp();
    }

    void TearDown() override {
        t->join();
    }
};


TEST_F(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
    pool->start(4);
    const unsigned int lots{ 5000 };
    addUsersAt(lots, Location{ aUserLocation.go(TenMeters, West) });

    t = make_shared<thread>(
        [&] { server.usersInBox(aUser, Width, Height, &countingListener); });

    countingListener.waitForCountAndFailOnTimeout(lots);
}

ThreadPool.h

class ThreadPool {
public:
    virtual ~ThreadPool() {
        stop();
    }

    void stop() {
        done_ = true;
        for (auto& thread : threads_) thread.join();
    }

    void start(unsigned int numberOfThreads = 1) {
        for (unsigned int i{ 0u }; i < numberOfThreads; i++)
            threads_.push_back(std::thread(&ThreadPool::worker, this));
    }

    bool hasWork() {
        std::lock_guard<std::mutex> block(mutex_);
        return !workQueue_.empty();
    }

    virtual void add(Work work) {
        std::lock_guard<std::mutex> block(mutex_);
        workQueue_.push_front(work);
    }

    Work pullWork() {
        std::lock_guard<std::mutex> block(mutex_);

        if (workQueue_.empty()) return Work{};

        auto work = workQueue_.back();
        workQueue_.pop_back();
        return work;
    }

private:
    void worker() {
        while (!done_) {
            while (!done_ && !hasWork())
                ;
            if (done_) break;
            pullWork().execute();
        }
    }

    std::atomic<bool> done_{ false };
    std::deque<Work> workQueue_;
    std::shared_ptr<std::thread> workThread_;
    std::mutex mutex_;
    std::vector<std::thread> threads_;
};

GeoServer.cpp

void GeoServer::track(const string& user) {
    locations_[user] = Location();
}

void GeoServer::stopTracking(const string& user) {
    locations_.erase(user);
}

bool GeoServer::isTracking(const string& user) const {
    return find(user) != locations_.end();
}

void GeoServer::updateLocation(const string& user, const Location& location) {
    locations_[user] = location;
}

Location GeoServer::locationOf(const string& user) const {
    if (!isTracking(user)) return Location{}; // TODO performance cost?

    return find(user)->second;
}

std::unordered_map<std::string, Location>::const_iterator
GeoServer::find(const std::string& user) const {
    return locations_.find(user);
}

bool GeoServer::isDifferentUserInBounds(
    const pair<string, Location>& each,
    const string& user,
    const Area& box) const {
    if (each.first == user) return false;
    return box.inBounds(each.second);
}

void GeoServer::usersInBox(
    const string& user, double widthInMeters, double heightInMeters,
    GeoServerListener* listener) const {
    auto location = locations_.find(user)->second;
    Area box{ location, widthInMeters, heightInMeters };

    for (auto& each : locations_) {
        Work work{ [&] {
           if (isDifferentUserInBounds(each, user, box))
              listener->updated(User{each.first, each.second});
        } };
        pool_->add(work);
    }
}

让我们看一下这段代码:

void GeoServer::usersInBox(
    const string& user, double widthInMeters, double heightInMeters, // #4
    GeoServerListener* listener) const { // #5
    auto location = locations_.find(user)->second;
    Area box{ location, widthInMeters, heightInMeters }; // #3

    for (auto& each : locations_) {
        Work work{ [&] { // #1
           if (isDifferentUserInBounds(each, user, box))
              listener->updated(User{each.first, each.second});
        } };
        pool_->add(work); // #2
    }
}

第 1 行创建一个工作项。这项工作是一个具有通用引用捕获的 lambda 函数(如有必要,请查找)。 lambda 函数从外部作用域引用了四个东西:eachuserboxlistener.

第 2 行安排池中的工作项。然后继续执行;该工作将在未来某个时间执行。

当for循环完成后,函数returns。现在事情变得非常不对劲。

捕获的 box 是对在#3 处定义的局部变量的引用。这个局部变量消失了; lambda 中的引用现在是悬空的,访问它是未定义的行为。

捕获的 listener 是对在#5 处定义的函数参数的引用。这个论点消失了;参考现在悬空。你可能会想,“哦,但那是一个指针,没关系!”不是。您不是在捕获指针,而是在捕获指针的 reference

捕获的user本身已经是一个引用,定义在#4。虽然争论消失了,但潜在的东西还没有。在您的测试中,碰巧参数引用了 aUser,它是测试 class 的成员,因此一直存在。但它可能是一个临时对象,一旦方法 returns 就被销毁,使捕获的引用悬空。

经验法则:不要对 lambda 使用 &,因为您不能保证它会在您离开 lambda 的周围范围之前执行。