为什么线程卡住了?

Why thread stuck?

我正在使用 C++ 在 Windows 上实现多线程下载管理器。

主线程启动下载管理线程M,M启动多个下载线程D。 每个 D 将使用 library cpr which is a wrapper around libcurl.

进行 HTTP 数据传输

启动一些D线程后,M进入循环,继续观察下载进度

奇怪的是:一旦第二个 D 开始,或者我通过来自 libcurl 的 CURLOPT_XFERINFOFUNCTION回调,M的循环会停止。 (在 M 的循环中有一个调试输出。我注意到它在控制台输出消失时停止了。也许它没有停止,只是进入了某种等待状态...)

M 和 D 线程均由 STL 启动 std::thread。

被这个问题折腾了一整天。任何线索将不胜感激...


这里是M线程入口:

void HttpDownloader1::MasterThreadFunc_()
{
int loop_count = 0;

// start the first download thread
if (!SplitDownload_(nullptr))
{
    status_ = Status::ERRONEOUS;
    return;
}

uint64_t prev_downloaded_bytes = recorder_->GetDownloadedBytes();

for (loop_count = 0; download_threads_.size() > 0; loop_count++)
//while (true)
{
    loop_count++;
#ifdef _DEBUG
    Debug_("main loop, threads: " + std::to_string(download_threads_.size()));
#endif
    std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 0.5s

    DownloadThread* splitable_segment = nullptr;
    auto it = download_threads_.begin();

    while (it != download_threads_.end())
    {
        DownloadThread* thread = *it;

        if (thread->status_ == Status::FINISH)
        {
            delete thread;
            it = download_threads_.erase(it);
        }
        else
        {
            switch (thread->status_)
            {
            case Status::RUNNING:
            default:
                recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
                // part of this segment may have been splited to other download thread
                //thread->end_ = recorder_->GetSegmentEnd(thread->pos_);
                break;

            case Status::SUCCESSFUL:
                if (recorder_->IsInitialized())
                {
                    thread->CloseFile();
                    recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
                }
                else
                {
                    if (!PrepareFile_(*it))
                        status_ = Status::ERRONEOUS;
                }
                splitable_segment = *it;
                break;

            case Status::ERRONEOUS:
                if (++retry_ > kMaxHttpRetry)
                {
                    status_ = Status::ERRONEOUS;
                }
                else
                {
                    thread->CloseFile();
                    recorder_->MarkFailed(thread->pos_, thread->end_);
                    splitable_segment = *it;
                }
                break;
            }
            it++;
        }
    }

    // break out if error occured
    if (status_ == Status::ERRONEOUS)
    {
        break;
    }

    // if download completed
    if (recorder_->IsFinish())
    {
        status_ = Status::SUCCESSFUL;
        break;
    }

    // calculate download speed every 1 second 
    if ((loop_count & 1) == 1)
    {
        auto bytes = recorder_->GetDownloadedBytes();
        bytes_per_second_ = bytes - prev_downloaded_bytes;
        prev_downloaded_bytes = bytes;
    }

    // save progress info every 2 seconds
    if ((loop_count & 3) == 3)
    {
        recorder_->Save();
    }

    // split download when any thread is available or every 4 seconds
    if (splitable_segment || (loop_count & 7) == 7)
    {
        if (splitable_segment != nullptr)
            SplitDownload_(splitable_segment);
        else if (download_threads_.size() < max_threads_)
            SplitDownload_(nullptr);
    }
}

master_thread_.detach();
status_ = status_ != Status::ERRONEOUS ? Status::SUCCESSFUL : status_;
}

这是 M 启动 D 线程的方式:

bool HttpDownloader1::SplitDownload_(DownloadThread* thread)
{
if (!recorder_->IsInitialized())
{
    if (!thread)
        thread = CreateDownloadThread_();
    thread->begin_ = 0;
    thread->end_ = 0;
}
else
{
    int64_t begin, end;
    if (recorder_->GetTask(&begin, &end)) 
    {
        // initialize this segment
        if (!thread)
            thread = CreateDownloadThread_();

        thread->begin_ = begin;
        thread->end_ = end;
        thread->pos_ = thread->begin_;

        if (thread->file_ == nullptr)
        {
            //errno_t e = fopen_s(&thread->file_, target_.GetPath().c_str(), "rb+");
            thread->file_ = fopen(target_.GetPath().c_str(), "rb+");
            //if (e == 0 && thread->file_)
            if (thread->file_)
            {
                fseek(thread->file_, (long)thread->begin_, SEEK_SET);
            }
            else
            {
                thread->status_ = Status::ERRONEOUS;
                return false;
            }
        }
    }
    else 
    {
        // no more segment to download or split, remove this thread if it exists.
        if (thread)
            thread->status_ = Status::FINISH;
    }
}

if (thread && thread->status_ != Status::FINISH)
{
    thread->status_ = Status::RUNNING;
    thread->thread_ = std::thread(&HttpDownloader1::DownloadThreadFunc_, this, thread);
    thread->thread_.detach();
}
return true;
}

这里是D线程入口:

void HttpDownloader1::DownloadThreadFunc_(DownloadThread* thread)
{
cpr::Response rsp;
if (thread->file_ == nullptr)
{
    rsp = cpr::Get(
        cpr::Url(target_.url_.c_str()),
        cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
        cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
        cpr::VerifySsl(false),
        cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
        cpr::CurlOption({ CURLOPT_NOPROGRESS, 1 }),
        cpr::CurlOption({ CURLOPT_WRITEDATA, nullptr }),
        cpr::CurlOption({ CURLOPT_WRITEFUNCTION, &HttpDownloader1::WriteCallback_ })
    );
}
else
{
    rsp = cpr::Get(
        cpr::Url(target_.url_.c_str()),
        cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
        cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
        cpr::VerifySsl(false),
        cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
        cpr::CurlOption({ CURLOPT_NOPROGRESS, 0 }),
        cpr::CurlOption({ CURLOPT_XFERINFODATA, thread }),
        cpr::CurlOption({ CURLOPT_XFERINFOFUNCTION, &HttpDownloader1::ProgressCallback_ }),
        cpr::CurlOption({ CURLOPT_WRITEDATA, thread->file_ }),
        cpr::CurlOption({ CURLOPT_WRITEFUNCTION, fwrite })
    );

}

if (rsp.status_code == 0)
{
    thread->status_ = Status::ERRONEOUS;
    Log_("thread:" + std::to_string(thread->id_) + " error: HTTP status code 0");
}
else if (rsp.status_code >= 400)
{
    thread->status_ = Status::ERRONEOUS;
    Log_("thread:" + std::to_string(thread->id_) + " erorr: HTTP status code " + std::to_string(rsp.status_code));
}
else if (rsp.error.code != cpr::ErrorCode::OK)
{
    thread->status_ = Status::ERRONEOUS;
    Log_("thread:" + std::to_string(thread->id_) + "error: " + rsp.error.message);
}
else
{
    if (thread->file_ == nullptr)
        thread->response_header_ = rsp.header;
    thread->status_ = Status::SUCCESSFUL;
}
}

这是 libcurl 的进度回调:

int HttpDownloader1::ProgressCallback_(
void* clientp, std::uint64_t dltotal, std::uint64_t dlnow, std::uint64_t ultotal, std::uint64_t ulnow
) {
auto thread = (DownloadThread*)clientp;
if (dlnow > 0)
{
    thread->pos_ = thread->begin_ + dlnow;

    if (thread->pos_ > thread->end_)
        return 1;
}
return 0;
}

回答:因为我犯了一个愚蠢的错误...

详情:
M线程并没有卡住(异常停止),只是陷入了死循环。 该循环不在 M 的主循环中,而是在 recorder_->MarkFinish(...) 调用中,我忘记在 if branch 之一中递增迭代器 when 通过 while(...) {...} 循环列表容器。

所以当我在recorder_->MarkFinish()行下断点时,它不会被捕获。

我最终是怎么发现的:
我在多线程编程方面没有太多经验。挠了将近一整天的脑袋,我想一定有一些我不熟悉的底层细节。所以我post这个问题。

@Raymond 评论让我重拾信心。我开始在 M 线程的主循环中逐行添加调试输出,试图找出当它似乎卡住时它在做什么。并注意到每次发生错误时,输出都会在 recorder_->MarkFinish(...) 调用后立即消失。所以我进入那个函数并捕获了下面粗心的代码:

while (it != segments_.end())
{
    Segment& seg = *it;

    if (seg.begin_ > end) break;
    if (seg.end_ < start) continue; // !
    
    if (start <= seg.begin_ && seg.begin_ <= end)
        seg.begin_ = end + 1;
    if (start <= seg.end_ && seg.end_ <= end)
        seg.end_ = start - 1;

    if (seg.end_ < seg.begin_)
        it = segments_.erase(it);
    else
        it++;
}

希望这可能有助于减少您遇到的问题的一些神秘感。