为什么线程卡住了?
Why thread stuck?
我正在使用 C++ 在 Windows 上实现多线程下载管理器。
主线程启动下载管理线程M,M启动多个下载线程D。
每个 D 将使用 library cpr which is a wrapper around libcurl.
进行 HTTP 数据传输
启动一些D线程后,M进入循环,继续观察下载进度
奇怪的是:一旦第二个 D 开始,或者我通过来自 libcurl 的 CURLOPT_XFERINFOFUNCTION回调,M的循环会停止。 (在 M 的循环中有一个调试输出。我注意到它在控制台输出消失时停止了。也许它没有停止,只是进入了某种等待状态...)
M 和 D 线程均由 STL 启动 std::thread。
被这个问题折腾了一整天。任何线索将不胜感激...
这里是M线程入口:
void HttpDownloader1::MasterThreadFunc_()
{
int loop_count = 0;
// start the first download thread
if (!SplitDownload_(nullptr))
{
status_ = Status::ERRONEOUS;
return;
}
uint64_t prev_downloaded_bytes = recorder_->GetDownloadedBytes();
for (loop_count = 0; download_threads_.size() > 0; loop_count++)
//while (true)
{
loop_count++;
#ifdef _DEBUG
Debug_("main loop, threads: " + std::to_string(download_threads_.size()));
#endif
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 0.5s
DownloadThread* splitable_segment = nullptr;
auto it = download_threads_.begin();
while (it != download_threads_.end())
{
DownloadThread* thread = *it;
if (thread->status_ == Status::FINISH)
{
delete thread;
it = download_threads_.erase(it);
}
else
{
switch (thread->status_)
{
case Status::RUNNING:
default:
recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
// part of this segment may have been splited to other download thread
//thread->end_ = recorder_->GetSegmentEnd(thread->pos_);
break;
case Status::SUCCESSFUL:
if (recorder_->IsInitialized())
{
thread->CloseFile();
recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
}
else
{
if (!PrepareFile_(*it))
status_ = Status::ERRONEOUS;
}
splitable_segment = *it;
break;
case Status::ERRONEOUS:
if (++retry_ > kMaxHttpRetry)
{
status_ = Status::ERRONEOUS;
}
else
{
thread->CloseFile();
recorder_->MarkFailed(thread->pos_, thread->end_);
splitable_segment = *it;
}
break;
}
it++;
}
}
// break out if error occured
if (status_ == Status::ERRONEOUS)
{
break;
}
// if download completed
if (recorder_->IsFinish())
{
status_ = Status::SUCCESSFUL;
break;
}
// calculate download speed every 1 second
if ((loop_count & 1) == 1)
{
auto bytes = recorder_->GetDownloadedBytes();
bytes_per_second_ = bytes - prev_downloaded_bytes;
prev_downloaded_bytes = bytes;
}
// save progress info every 2 seconds
if ((loop_count & 3) == 3)
{
recorder_->Save();
}
// split download when any thread is available or every 4 seconds
if (splitable_segment || (loop_count & 7) == 7)
{
if (splitable_segment != nullptr)
SplitDownload_(splitable_segment);
else if (download_threads_.size() < max_threads_)
SplitDownload_(nullptr);
}
}
master_thread_.detach();
status_ = status_ != Status::ERRONEOUS ? Status::SUCCESSFUL : status_;
}
这是 M 启动 D 线程的方式:
bool HttpDownloader1::SplitDownload_(DownloadThread* thread)
{
if (!recorder_->IsInitialized())
{
if (!thread)
thread = CreateDownloadThread_();
thread->begin_ = 0;
thread->end_ = 0;
}
else
{
int64_t begin, end;
if (recorder_->GetTask(&begin, &end))
{
// initialize this segment
if (!thread)
thread = CreateDownloadThread_();
thread->begin_ = begin;
thread->end_ = end;
thread->pos_ = thread->begin_;
if (thread->file_ == nullptr)
{
//errno_t e = fopen_s(&thread->file_, target_.GetPath().c_str(), "rb+");
thread->file_ = fopen(target_.GetPath().c_str(), "rb+");
//if (e == 0 && thread->file_)
if (thread->file_)
{
fseek(thread->file_, (long)thread->begin_, SEEK_SET);
}
else
{
thread->status_ = Status::ERRONEOUS;
return false;
}
}
}
else
{
// no more segment to download or split, remove this thread if it exists.
if (thread)
thread->status_ = Status::FINISH;
}
}
if (thread && thread->status_ != Status::FINISH)
{
thread->status_ = Status::RUNNING;
thread->thread_ = std::thread(&HttpDownloader1::DownloadThreadFunc_, this, thread);
thread->thread_.detach();
}
return true;
}
这里是D线程入口:
void HttpDownloader1::DownloadThreadFunc_(DownloadThread* thread)
{
cpr::Response rsp;
if (thread->file_ == nullptr)
{
rsp = cpr::Get(
cpr::Url(target_.url_.c_str()),
cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
cpr::VerifySsl(false),
cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
cpr::CurlOption({ CURLOPT_NOPROGRESS, 1 }),
cpr::CurlOption({ CURLOPT_WRITEDATA, nullptr }),
cpr::CurlOption({ CURLOPT_WRITEFUNCTION, &HttpDownloader1::WriteCallback_ })
);
}
else
{
rsp = cpr::Get(
cpr::Url(target_.url_.c_str()),
cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
cpr::VerifySsl(false),
cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
cpr::CurlOption({ CURLOPT_NOPROGRESS, 0 }),
cpr::CurlOption({ CURLOPT_XFERINFODATA, thread }),
cpr::CurlOption({ CURLOPT_XFERINFOFUNCTION, &HttpDownloader1::ProgressCallback_ }),
cpr::CurlOption({ CURLOPT_WRITEDATA, thread->file_ }),
cpr::CurlOption({ CURLOPT_WRITEFUNCTION, fwrite })
);
}
if (rsp.status_code == 0)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + " error: HTTP status code 0");
}
else if (rsp.status_code >= 400)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + " erorr: HTTP status code " + std::to_string(rsp.status_code));
}
else if (rsp.error.code != cpr::ErrorCode::OK)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + "error: " + rsp.error.message);
}
else
{
if (thread->file_ == nullptr)
thread->response_header_ = rsp.header;
thread->status_ = Status::SUCCESSFUL;
}
}
这是 libcurl 的进度回调:
int HttpDownloader1::ProgressCallback_(
void* clientp, std::uint64_t dltotal, std::uint64_t dlnow, std::uint64_t ultotal, std::uint64_t ulnow
) {
auto thread = (DownloadThread*)clientp;
if (dlnow > 0)
{
thread->pos_ = thread->begin_ + dlnow;
if (thread->pos_ > thread->end_)
return 1;
}
return 0;
}
回答:因为我犯了一个愚蠢的错误...
详情:
M线程并没有卡住(异常停止),只是陷入了死循环。
该循环不在 M 的主循环中,而是在 recorder_->MarkFinish(...)
调用中,我忘记在 if branch
之一中递增迭代器 when 通过 while(...) {...}
循环列表容器。
所以当我在recorder_->MarkFinish()
行下断点时,它不会被捕获。
我最终是怎么发现的:
我在多线程编程方面没有太多经验。挠了将近一整天的脑袋,我想一定有一些我不熟悉的底层细节。所以我post这个问题。
@Raymond 评论让我重拾信心。我开始在 M 线程的主循环中逐行添加调试输出,试图找出当它似乎卡住时它在做什么。并注意到每次发生错误时,输出都会在 recorder_->MarkFinish(...)
调用后立即消失。所以我进入那个函数并捕获了下面粗心的代码:
while (it != segments_.end())
{
Segment& seg = *it;
if (seg.begin_ > end) break;
if (seg.end_ < start) continue; // !
if (start <= seg.begin_ && seg.begin_ <= end)
seg.begin_ = end + 1;
if (start <= seg.end_ && seg.end_ <= end)
seg.end_ = start - 1;
if (seg.end_ < seg.begin_)
it = segments_.erase(it);
else
it++;
}
希望这可能有助于减少您遇到的问题的一些神秘感。
我正在使用 C++ 在 Windows 上实现多线程下载管理器。
主线程启动下载管理线程M,M启动多个下载线程D。 每个 D 将使用 library cpr which is a wrapper around libcurl.
进行 HTTP 数据传输启动一些D线程后,M进入循环,继续观察下载进度
奇怪的是:一旦第二个 D 开始,或者我通过来自 libcurl 的 CURLOPT_XFERINFOFUNCTION回调,M的循环会停止。 (在 M 的循环中有一个调试输出。我注意到它在控制台输出消失时停止了。也许它没有停止,只是进入了某种等待状态...)
M 和 D 线程均由 STL 启动 std::thread。
被这个问题折腾了一整天。任何线索将不胜感激...
这里是M线程入口:
void HttpDownloader1::MasterThreadFunc_()
{
int loop_count = 0;
// start the first download thread
if (!SplitDownload_(nullptr))
{
status_ = Status::ERRONEOUS;
return;
}
uint64_t prev_downloaded_bytes = recorder_->GetDownloadedBytes();
for (loop_count = 0; download_threads_.size() > 0; loop_count++)
//while (true)
{
loop_count++;
#ifdef _DEBUG
Debug_("main loop, threads: " + std::to_string(download_threads_.size()));
#endif
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 0.5s
DownloadThread* splitable_segment = nullptr;
auto it = download_threads_.begin();
while (it != download_threads_.end())
{
DownloadThread* thread = *it;
if (thread->status_ == Status::FINISH)
{
delete thread;
it = download_threads_.erase(it);
}
else
{
switch (thread->status_)
{
case Status::RUNNING:
default:
recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
// part of this segment may have been splited to other download thread
//thread->end_ = recorder_->GetSegmentEnd(thread->pos_);
break;
case Status::SUCCESSFUL:
if (recorder_->IsInitialized())
{
thread->CloseFile();
recorder_->MarkFinish(thread->begin_, thread->pos_ - 1);
}
else
{
if (!PrepareFile_(*it))
status_ = Status::ERRONEOUS;
}
splitable_segment = *it;
break;
case Status::ERRONEOUS:
if (++retry_ > kMaxHttpRetry)
{
status_ = Status::ERRONEOUS;
}
else
{
thread->CloseFile();
recorder_->MarkFailed(thread->pos_, thread->end_);
splitable_segment = *it;
}
break;
}
it++;
}
}
// break out if error occured
if (status_ == Status::ERRONEOUS)
{
break;
}
// if download completed
if (recorder_->IsFinish())
{
status_ = Status::SUCCESSFUL;
break;
}
// calculate download speed every 1 second
if ((loop_count & 1) == 1)
{
auto bytes = recorder_->GetDownloadedBytes();
bytes_per_second_ = bytes - prev_downloaded_bytes;
prev_downloaded_bytes = bytes;
}
// save progress info every 2 seconds
if ((loop_count & 3) == 3)
{
recorder_->Save();
}
// split download when any thread is available or every 4 seconds
if (splitable_segment || (loop_count & 7) == 7)
{
if (splitable_segment != nullptr)
SplitDownload_(splitable_segment);
else if (download_threads_.size() < max_threads_)
SplitDownload_(nullptr);
}
}
master_thread_.detach();
status_ = status_ != Status::ERRONEOUS ? Status::SUCCESSFUL : status_;
}
这是 M 启动 D 线程的方式:
bool HttpDownloader1::SplitDownload_(DownloadThread* thread)
{
if (!recorder_->IsInitialized())
{
if (!thread)
thread = CreateDownloadThread_();
thread->begin_ = 0;
thread->end_ = 0;
}
else
{
int64_t begin, end;
if (recorder_->GetTask(&begin, &end))
{
// initialize this segment
if (!thread)
thread = CreateDownloadThread_();
thread->begin_ = begin;
thread->end_ = end;
thread->pos_ = thread->begin_;
if (thread->file_ == nullptr)
{
//errno_t e = fopen_s(&thread->file_, target_.GetPath().c_str(), "rb+");
thread->file_ = fopen(target_.GetPath().c_str(), "rb+");
//if (e == 0 && thread->file_)
if (thread->file_)
{
fseek(thread->file_, (long)thread->begin_, SEEK_SET);
}
else
{
thread->status_ = Status::ERRONEOUS;
return false;
}
}
}
else
{
// no more segment to download or split, remove this thread if it exists.
if (thread)
thread->status_ = Status::FINISH;
}
}
if (thread && thread->status_ != Status::FINISH)
{
thread->status_ = Status::RUNNING;
thread->thread_ = std::thread(&HttpDownloader1::DownloadThreadFunc_, this, thread);
thread->thread_.detach();
}
return true;
}
这里是D线程入口:
void HttpDownloader1::DownloadThreadFunc_(DownloadThread* thread)
{
cpr::Response rsp;
if (thread->file_ == nullptr)
{
rsp = cpr::Get(
cpr::Url(target_.url_.c_str()),
cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
cpr::VerifySsl(false),
cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
cpr::CurlOption({ CURLOPT_NOPROGRESS, 1 }),
cpr::CurlOption({ CURLOPT_WRITEDATA, nullptr }),
cpr::CurlOption({ CURLOPT_WRITEFUNCTION, &HttpDownloader1::WriteCallback_ })
);
}
else
{
rsp = cpr::Get(
cpr::Url(target_.url_.c_str()),
cpr::ConnectTimeout(std::chrono::seconds(kConnectionTimeout)),
cpr::Timeout(std::chrono::seconds(kTransmitTimeout)),
cpr::VerifySsl(false),
cpr::Header{ { "Range", thread->GetRangeHeaderString().c_str() } },
cpr::CurlOption({ CURLOPT_NOPROGRESS, 0 }),
cpr::CurlOption({ CURLOPT_XFERINFODATA, thread }),
cpr::CurlOption({ CURLOPT_XFERINFOFUNCTION, &HttpDownloader1::ProgressCallback_ }),
cpr::CurlOption({ CURLOPT_WRITEDATA, thread->file_ }),
cpr::CurlOption({ CURLOPT_WRITEFUNCTION, fwrite })
);
}
if (rsp.status_code == 0)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + " error: HTTP status code 0");
}
else if (rsp.status_code >= 400)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + " erorr: HTTP status code " + std::to_string(rsp.status_code));
}
else if (rsp.error.code != cpr::ErrorCode::OK)
{
thread->status_ = Status::ERRONEOUS;
Log_("thread:" + std::to_string(thread->id_) + "error: " + rsp.error.message);
}
else
{
if (thread->file_ == nullptr)
thread->response_header_ = rsp.header;
thread->status_ = Status::SUCCESSFUL;
}
}
这是 libcurl 的进度回调:
int HttpDownloader1::ProgressCallback_(
void* clientp, std::uint64_t dltotal, std::uint64_t dlnow, std::uint64_t ultotal, std::uint64_t ulnow
) {
auto thread = (DownloadThread*)clientp;
if (dlnow > 0)
{
thread->pos_ = thread->begin_ + dlnow;
if (thread->pos_ > thread->end_)
return 1;
}
return 0;
}
回答:因为我犯了一个愚蠢的错误...
详情:
M线程并没有卡住(异常停止),只是陷入了死循环。
该循环不在 M 的主循环中,而是在 recorder_->MarkFinish(...)
调用中,我忘记在 if branch
之一中递增迭代器 when 通过 while(...) {...}
循环列表容器。
所以当我在recorder_->MarkFinish()
行下断点时,它不会被捕获。
我最终是怎么发现的:
我在多线程编程方面没有太多经验。挠了将近一整天的脑袋,我想一定有一些我不熟悉的底层细节。所以我post这个问题。
@Raymond 评论让我重拾信心。我开始在 M 线程的主循环中逐行添加调试输出,试图找出当它似乎卡住时它在做什么。并注意到每次发生错误时,输出都会在 recorder_->MarkFinish(...)
调用后立即消失。所以我进入那个函数并捕获了下面粗心的代码:
while (it != segments_.end())
{
Segment& seg = *it;
if (seg.begin_ > end) break;
if (seg.end_ < start) continue; // !
if (start <= seg.begin_ && seg.begin_ <= end)
seg.begin_ = end + 1;
if (start <= seg.end_ && seg.end_ <= end)
seg.end_ = start - 1;
if (seg.end_ < seg.begin_)
it = segments_.erase(it);
else
it++;
}
希望这可能有助于减少您遇到的问题的一些神秘感。