boost::unordered_map<int, struct> 和 shared_mutex 的线程安全
Thread safety of boost::unordered_map<int, struct> and shared_mutex
我正在尝试解析来自具有 4 个线程的套接字的 ts 流数据。我决定使用 boost shared mutex 来管理连接和数据接收。但我完全是 c++ 的新手,我不确定我是否会在胎面安全方面做对。我正在使用 boost unordered_map,当新用户连接时,我用唯一锁锁定互斥锁并将用户添加到地图,当该用户断开连接时,我正在用唯一的锁锁定互斥锁并将其从地图中删除。 TsStreams 结构在用户发送数据时包含向量和一些附加变量,我使用共享锁从映射中获取用户的 TsStreams 引用,向向量添加新数据并修改附加变量。以这种方式修改 TsStreams 是否线程安全?
class Demuxer {
public:
Demuxer();
typedef signal<void (int, TsStream)> PacketSignal;
void onUserConnected(User);
void onUserDisconnected(int);
void onUserData(Data);
void addPacketSignal(const PacketSignal::slot_type& slot);
private:
mutable PacketSignal packetSignal;
void onPacketReady(int, TsStream);
TsDemuxer tsDemuxer;
boost::unordered_map<int, TsStreams> usersData;
boost::shared_mutex mtx_;
};
#include "Demuxer.h"
Demuxer::Demuxer() {
tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}
void Demuxer::onUserConnected(User user){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(user.socket)){
usersData.erase(user.socket);
}
TsStreams streams;
streams.video.isVideo = true;
usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(socket)){
usersData.erase(socket);
}
}
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> lock(mtx_);
if(!usersData.count(data.socket)){
return;
}
tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
}
void Demuxer::onPacketReady(int socket, TsStream data) {
packetSignal(socket, data);
}
void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
packetSignal.connect(slot);
}
struct TsStreams{
TsStreams() = default;
TsStreams(const TsStreams &p1) {}
TsStream video;
TsStream audio;
};
struct TsStream
{
TsStream() = default;
TsStream(const TsStream &p1) {}
boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
uint64_t PTS = 0;
uint64_t DTS = 0;
std::vector<char> buffer;
uint32_t bytesDataLength = 0;
bool isVideo = false;
};
class TsDemuxer {
public:
typedef signal<void (int, TsStream)> PacketSignal;
void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
PacketSignal packetSignal;
void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
void parseAdaptationField(BitReader &bitReader);
void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
void parsePES(TsStream &stream, BitReader &bitReader);
int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
if(video){
streams.video.mtx_.lock();
parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
}else{
streams.audio.mtx_.lock();
parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
}
}
void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
//some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}
void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {
if(payload_unit_start_indicator)
{
if(!stream.buffer.empty()){
packetSignal(socket, stream);
stream.buffer = vector<char>();
stream.bytesDataLength = 0;
}
parsePES(stream, bitReader);
}
size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;
copy(bitReader.getBitReaderData(), bitReader.getBitReaderData()+payloadSizeBytes,back_inserter(stream.buffer));
stream.mtx_.unlock();
}
分离器在我看来是正确的。虽然有一些低效率:
您不需要在 erase
之前 count
。只是擦除。如果一个元素不存在,这将什么都不做。这为您节省了一次查找。同样,不要使用 count
后跟 at
。使用find
(使用见下文)。
您可能希望将尽可能多的工作移出关键部分。 onUserConnected
中的示例您可以在获取锁之前创建 TsStreams 对象。
请注意,更改无序映射永远不会使映射中元素的指针或引用失效,除非它们被擦除。这意味着在 onUserData
中,您不必在解析数据包时持有地图上的锁。
也就是说,假设您没有从两个不同的线程为同一用户调用 onUserData。您可以通过引入第二个锁定 TsStream 对象来防止这种情况。同样,您应该防止擦除元素,而另一个线程可能仍在解析最后一个数据包。我会为此使用 shared_ptr
。像这样:
class Demuxer {
...
boost::unordered_map<int, boost::shared_ptr<TsStreams> > usersData;
boost::shared_mutex mtx_;
};
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> maplock(mtx_);
auto found = usersData.find(data.socket);
if(found == usersData.end())
return;
boost::shared_ptr<TsStreams> stream = found->second;
boost::unique_lock<boost::recursive_mutex> datalock(stream->mtx_);
maplock.unlock();
tsDemuxer.parsePacket(data.socket, *stream, (uint8_t *) data.buffer, data.length);
}
如果您使用这种方法减少了 Demuxer 锁定的时间,您可能应该用普通互斥锁替换该共享互斥锁。共享互斥体的开销要高得多,对于这么短的关键部分来说不值得。
TsDemuxer 看起来有点不稳定:
在 TsDemuxer::parsePacket
中,您永远不会解锁互斥量。那不应该是 unique_lock
吗?同样,在 parseStream
中,解锁似乎未配对。通常,与手动锁定和解锁相比,使用 unique_lock
对象始终是可行的方法。如果有的话,锁定和解锁 unique_lock
,而不是互斥锁。
与多线程无关的备注
stream.buffer.clear()
比 stream.buffer = vector<char>()
更有效,因为这将重用缓冲区内存而不是完全释放它。
正如其他人所指出的,boost 的这些部分现在是标准库的一部分。将 boost::
替换为 std::
并启用最新的 C++ 标准,如 C++14 或 17,就可以了。在最坏的情况下,您必须将 shared_mutex
替换为 shared_timed_mutex
.
在 Demuxer 中,您按值传递 User 和 Data 对象。您确定这些不应该是 const 引用吗?
我正在尝试解析来自具有 4 个线程的套接字的 ts 流数据。我决定使用 boost shared mutex 来管理连接和数据接收。但我完全是 c++ 的新手,我不确定我是否会在胎面安全方面做对。我正在使用 boost unordered_map
class Demuxer {
public:
Demuxer();
typedef signal<void (int, TsStream)> PacketSignal;
void onUserConnected(User);
void onUserDisconnected(int);
void onUserData(Data);
void addPacketSignal(const PacketSignal::slot_type& slot);
private:
mutable PacketSignal packetSignal;
void onPacketReady(int, TsStream);
TsDemuxer tsDemuxer;
boost::unordered_map<int, TsStreams> usersData;
boost::shared_mutex mtx_;
};
#include "Demuxer.h"
Demuxer::Demuxer() {
tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}
void Demuxer::onUserConnected(User user){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(user.socket)){
usersData.erase(user.socket);
}
TsStreams streams;
streams.video.isVideo = true;
usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(socket)){
usersData.erase(socket);
}
}
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> lock(mtx_);
if(!usersData.count(data.socket)){
return;
}
tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
}
void Demuxer::onPacketReady(int socket, TsStream data) {
packetSignal(socket, data);
}
void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
packetSignal.connect(slot);
}
struct TsStreams{
TsStreams() = default;
TsStreams(const TsStreams &p1) {}
TsStream video;
TsStream audio;
};
struct TsStream
{
TsStream() = default;
TsStream(const TsStream &p1) {}
boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
uint64_t PTS = 0;
uint64_t DTS = 0;
std::vector<char> buffer;
uint32_t bytesDataLength = 0;
bool isVideo = false;
};
class TsDemuxer {
public:
typedef signal<void (int, TsStream)> PacketSignal;
void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
PacketSignal packetSignal;
void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
void parseAdaptationField(BitReader &bitReader);
void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
void parsePES(TsStream &stream, BitReader &bitReader);
int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
if(video){
streams.video.mtx_.lock();
parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
}else{
streams.audio.mtx_.lock();
parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
}
}
void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
//some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}
void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {
if(payload_unit_start_indicator)
{
if(!stream.buffer.empty()){
packetSignal(socket, stream);
stream.buffer = vector<char>();
stream.bytesDataLength = 0;
}
parsePES(stream, bitReader);
}
size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;
copy(bitReader.getBitReaderData(), bitReader.getBitReaderData()+payloadSizeBytes,back_inserter(stream.buffer));
stream.mtx_.unlock();
}
分离器在我看来是正确的。虽然有一些低效率:
您不需要在
erase
之前count
。只是擦除。如果一个元素不存在,这将什么都不做。这为您节省了一次查找。同样,不要使用count
后跟at
。使用find
(使用见下文)。您可能希望将尽可能多的工作移出关键部分。
onUserConnected
中的示例您可以在获取锁之前创建 TsStreams 对象。请注意,更改无序映射永远不会使映射中元素的指针或引用失效,除非它们被擦除。这意味着在
onUserData
中,您不必在解析数据包时持有地图上的锁。
也就是说,假设您没有从两个不同的线程为同一用户调用 onUserData。您可以通过引入第二个锁定 TsStream 对象来防止这种情况。同样,您应该防止擦除元素,而另一个线程可能仍在解析最后一个数据包。我会为此使用 shared_ptr
。像这样:
class Demuxer {
...
boost::unordered_map<int, boost::shared_ptr<TsStreams> > usersData;
boost::shared_mutex mtx_;
};
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> maplock(mtx_);
auto found = usersData.find(data.socket);
if(found == usersData.end())
return;
boost::shared_ptr<TsStreams> stream = found->second;
boost::unique_lock<boost::recursive_mutex> datalock(stream->mtx_);
maplock.unlock();
tsDemuxer.parsePacket(data.socket, *stream, (uint8_t *) data.buffer, data.length);
}
如果您使用这种方法减少了 Demuxer 锁定的时间,您可能应该用普通互斥锁替换该共享互斥锁。共享互斥体的开销要高得多,对于这么短的关键部分来说不值得。
TsDemuxer 看起来有点不稳定:
在 TsDemuxer::parsePacket
中,您永远不会解锁互斥量。那不应该是 unique_lock
吗?同样,在 parseStream
中,解锁似乎未配对。通常,与手动锁定和解锁相比,使用 unique_lock
对象始终是可行的方法。如果有的话,锁定和解锁 unique_lock
,而不是互斥锁。
与多线程无关的备注
stream.buffer.clear()
比stream.buffer = vector<char>()
更有效,因为这将重用缓冲区内存而不是完全释放它。正如其他人所指出的,boost 的这些部分现在是标准库的一部分。将
boost::
替换为std::
并启用最新的 C++ 标准,如 C++14 或 17,就可以了。在最坏的情况下,您必须将shared_mutex
替换为shared_timed_mutex
.在 Demuxer 中,您按值传递 User 和 Data 对象。您确定这些不应该是 const 引用吗?