boost::unordered_map<int, struct> 和 shared_mutex 的线程安全

Thread safety of boost::unordered_map<int, struct> and shared_mutex

我正在尝试解析来自具有 4 个线程的套接字的 ts 流数据。我决定使用 boost shared mutex 来管理连接和数据接收。但我完全是 c++ 的新手,我不确定我是否会在胎面安全方面做对。我正在使用 boost unordered_map,当新用户连接时,我用唯一锁锁定互斥锁并将用户添加到地图,当该用户断开连接时,我正在用唯一的锁锁定互斥锁并将其从地图中删除。 TsStreams 结构在用户发送数据时包含向量和一些附加变量,我使用共享锁从映射中获取用户的 TsStreams 引用,向向量添加新数据并修改附加变量。以这种方式修改 TsStreams 是否线程安全?

class Demuxer {

public:
    Demuxer();
    typedef signal<void (int, TsStream)> PacketSignal;
    void onUserConnected(User);
    void onUserDisconnected(int);
    void onUserData(Data);
    void addPacketSignal(const PacketSignal::slot_type& slot);
private:
    mutable PacketSignal packetSignal;
    void onPacketReady(int, TsStream);
    TsDemuxer tsDemuxer;
    boost::unordered_map<int, TsStreams> usersData;
    boost::shared_mutex mtx_;
};
#include "Demuxer.h"

Demuxer::Demuxer() {
    tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}

void Demuxer::onUserConnected(User user){
    boost::unique_lock<boost::shared_mutex> lock(mtx_);
    if(usersData.count(user.socket)){
        usersData.erase(user.socket);
    }
    TsStreams streams;
    streams.video.isVideo = true;
    usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
    boost::unique_lock<boost::shared_mutex> lock(mtx_);
    if(usersData.count(socket)){
        usersData.erase(socket);
    }
}

void Demuxer::onUserData(Data data) {
    boost::shared_lock<boost::shared_mutex> lock(mtx_);
    if(!usersData.count(data.socket)){
        return;
    }
    tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
    }

void Demuxer::onPacketReady(int socket, TsStream data) {
    packetSignal(socket, data);
}

void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
    packetSignal.connect(slot);
}
struct TsStreams{
    TsStreams() = default;
    TsStreams(const TsStreams &p1) {}
    TsStream video;
    TsStream audio;
};
struct TsStream
{
    TsStream() = default;
    TsStream(const TsStream &p1) {}
    boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
    uint64_t PTS = 0;
    uint64_t DTS = 0;
    std::vector<char> buffer;
    uint32_t bytesDataLength = 0;
    bool isVideo = false;
};
class TsDemuxer {
public:
    typedef signal<void (int, TsStream)> PacketSignal;
    void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
    connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
    PacketSignal packetSignal;
    void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
    void parseAdaptationField(BitReader &bitReader);
    void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
    void parsePES(TsStream &stream, BitReader &bitReader);
    int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
    if(video){
            streams.video.mtx_.lock();
            parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
        }else{
            streams.audio.mtx_.lock();
            parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
        }
}

void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
 //some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}

void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {

    if(payload_unit_start_indicator)
    {
        if(!stream.buffer.empty()){
            packetSignal(socket, stream);
            stream.buffer = vector<char>();
            stream.bytesDataLength = 0;
        }
        parsePES(stream, bitReader);
    }

    size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;

    copy(bitReader.getBitReaderData(), bitReader.getBitReaderData()+payloadSizeBytes,back_inserter(stream.buffer));
    stream.mtx_.unlock();
}

分离器在我看来是正确的。虽然有一些低效率:

  1. 您不需要在 erase 之前 count。只是擦除。如果一个元素不存在,这将什么都不做。这为您节省了一次查找。同样,不要使用 count 后跟 at。使用find(使用见下文)。

  2. 您可能希望将尽可能多的工作移出关键部分。 onUserConnected 中的示例您可以在获取锁之前创建 TsStreams 对象。

  3. 请注意,更改无序映射永远不会使映射中元素的指针或引用失效,除非它们被擦除。这意味着在 onUserData 中,您不必在解析数据包时持有地图上的锁。

也就是说,假设您没有从两个不同的线程为同一用户调用 onUserData。您可以通过引入第二个锁定 TsStream 对象来防止这种情况。同样,您应该防止擦除元素,而另一个线程可能仍在解析最后一个数据包。我会为此使用 shared_ptr 。像这样:

class Demuxer {
...
    boost::unordered_map<int, boost::shared_ptr<TsStreams> > usersData;
    boost::shared_mutex mtx_;
};
void Demuxer::onUserData(Data data) {
    boost::shared_lock<boost::shared_mutex> maplock(mtx_);
    auto found = usersData.find(data.socket);
    if(found == usersData.end())
        return;
    boost::shared_ptr<TsStreams> stream = found->second;
    boost::unique_lock<boost::recursive_mutex> datalock(stream->mtx_);
    maplock.unlock();
    tsDemuxer.parsePacket(data.socket, *stream, (uint8_t *) data.buffer, data.length);
}

如果您使用这种方法减少了 Demuxer 锁定的时间,您可能应该用普通互斥锁替换该共享互斥锁。共享互斥体的开销要高得多,对于这么短的关键部分来说不值得。

TsDemuxer 看起来有点不稳定:

TsDemuxer::parsePacket 中,您永远不会解锁互斥量。那不应该是 unique_lock 吗?同样,在 parseStream 中,解锁似乎未配对。通常,与手动锁定和解锁相比,使用 unique_lock 对象始终是可行的方法。如果有的话,锁定和解锁 unique_lock,而不是互斥锁。

与多线程无关的备注

  1. stream.buffer.clear()stream.buffer = vector<char>() 更有效,因为这将重用缓冲区内存而不是完全释放它。

  2. 正如其他人所指出的,boost 的这些部分现在是标准库的一部分。将 boost:: 替换为 std:: 并启用最新的 C++ 标准,如 C++14 或 17,就可以了。在最坏的情况下,您必须将 shared_mutex 替换为 shared_timed_mutex.

  3. 在 Demuxer 中,您按值传递 User 和 Data 对象。您确定这些不应该是 const 引用吗?