如何通过 zmq 多部分消息发送文件?
How to send a file by a zmq multipart message?
我正在尝试使用 ZeroMQ
基础设施发送视频,我将视频拆分成多个块来发送。当我这样做并将视频放入向量中以通过 zmq::send_multipart
发送时,我的 RAM 内存使用率非常高,因此有时会出现分段错误。
我头疼的是,当我评论发送multipart消息的行并运行程序时,vector是正常制作的,我没有得到分段错误,虽然RAM内存的消耗不是那么重.
有人可以告诉我如何发送此文件吗?
服务器代码:
#include <fstream>
#include <sstream>
#include <chrono>
#include <thread>
#include <iostream>
#include <future>
#include <zmq.hpp>
#include <zmq_addon.hpp>
using namespace std::chrono_literals;
const int size1MB = 1024 * 1024;
template <typename T>
void debug(T x)
{
std::cout << x << std::endl;
}
//Generate new chunks
std::unique_ptr<std::ofstream> createChunkFile(std::vector<std::string> &vecFilenames)
{
std::stringstream filename;
filename << "chunk" << vecFilenames.size() << ".mp4";
vecFilenames.push_back(filename.str());
return std::make_unique<std::ofstream>(filename.str(), std::ios::trunc);
}
//Split the file into chunks
void split(std::istream &inStream, int nMegaBytesPerChunk, std::vector<std::string> &vecFilenames)
{
std::unique_ptr<char[]> buffer(new char[size1MB]);
int nCurrentMegaBytes = 0;
std::unique_ptr<std::ostream> pOutStream = createChunkFile(vecFilenames);
while (!inStream.eof())
{
inStream.read(buffer.get(), size1MB);
pOutStream->write(buffer.get(), inStream.gcount());
++nCurrentMegaBytes;
if (nCurrentMegaBytes >= nMegaBytesPerChunk)
{
pOutStream = createChunkFile(vecFilenames);
nCurrentMegaBytes = 0;
}
}
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::rep);
socket.bind("tcp://*:5555");
std::ifstream img("video2.mp4", std::ifstream::in | std::ios::binary);
std::ifstream aux;
std::vector<std::string> vecFilenames;
std::vector<zmq::const_buffer> data;
std::ostringstream os;
std::async(std::launch::async, [&img, &vecFilenames]() {
split(img, 100, vecFilenames);
});
img.close();
zmq::message_t message, aux2;
socket.recv(message, zmq::recv_flags::none);
//Put the chunks into the vector
std::async([&data, &aux, &os, &vecFilenames]() {
for (int i = 0; i < vecFilenames.size(); i++)
{
std::async([&aux, &i]() {
aux.open("chunk" + std::to_string(i) + ".mp4", std::ifstream::in | std::ios::binary);
});
os << aux.rdbuf();
data.push_back(zmq::buffer(os.str()));
os.clear();
aux.close();
}
});
//Send the vector for the client
std::async([&socket, &data] {
zmq::send_multipart(socket, data);
});
}
客户端:
#include <fstream>
#include <sstream>
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <queue>
#include <deque>
#include <future>
#include <vector>
using namespace std::chrono_literals;
template <typename T>
void debug(T x)
{
std::cout << x << std::endl;
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::req);
socket.connect("tcp://localhost:5555");
std::ofstream img("teste.mp4", std::ios::out | std::ios::binary);
socket.send(zmq::buffer("ok\n"), zmq::send_flags::none);
std::vector<zmq::message_t> send_msgs;
zmq::message_t size;
std::async([&send_msgs, &img, &socket] {
zmq::recv_multipart(socket, std::back_inserter(send_msgs));
while (send_msgs.size())
{
img << send_msgs[0].to_string();
send_msgs.erase(send_msgs.begin());
}
});
}
尝试通过 multipart-message 移动所有数据确实会将所有数据收集到一个巨大的 BLOB 中,再加上添加重复的 O/S-level transport-class 特定缓冲区,最可能的结果是崩溃。
将 video-BLOB 的各个块作为单独的 simple-message 有效载荷发送并重建 BLOB(最好通过索引编号,可以选择 re-request 任何未到达的部分receiver-side ).
使用带有 REQ/REP
的 std::async
模式似乎有点棘手,因为此原型必须保持其 dFSA 交错序列 .recv()-.send()-.recv()- .send()-...ad infinitum...因为如果不这样做就会陷入无法挽救的相互僵局。
对于流媒体视频(如 CV / scene-detection),还有更多技巧可以使用 - 其中之一是使用 ZMQ_CONFLATE
选项,以便发送最近的 video-frame,不会在“过时”scene-images 上浪费时间,它们已经成为历史的一部分,因此总是将最近的 video-frame 交付给 receiving-side 处理。
我正在尝试使用 ZeroMQ
基础设施发送视频,我将视频拆分成多个块来发送。当我这样做并将视频放入向量中以通过 zmq::send_multipart
发送时,我的 RAM 内存使用率非常高,因此有时会出现分段错误。
我头疼的是,当我评论发送multipart消息的行并运行程序时,vector是正常制作的,我没有得到分段错误,虽然RAM内存的消耗不是那么重.
有人可以告诉我如何发送此文件吗?
服务器代码:
#include <fstream>
#include <sstream>
#include <chrono>
#include <thread>
#include <iostream>
#include <future>
#include <zmq.hpp>
#include <zmq_addon.hpp>
using namespace std::chrono_literals;
const int size1MB = 1024 * 1024;
template <typename T>
void debug(T x)
{
std::cout << x << std::endl;
}
//Generate new chunks
std::unique_ptr<std::ofstream> createChunkFile(std::vector<std::string> &vecFilenames)
{
std::stringstream filename;
filename << "chunk" << vecFilenames.size() << ".mp4";
vecFilenames.push_back(filename.str());
return std::make_unique<std::ofstream>(filename.str(), std::ios::trunc);
}
//Split the file into chunks
void split(std::istream &inStream, int nMegaBytesPerChunk, std::vector<std::string> &vecFilenames)
{
std::unique_ptr<char[]> buffer(new char[size1MB]);
int nCurrentMegaBytes = 0;
std::unique_ptr<std::ostream> pOutStream = createChunkFile(vecFilenames);
while (!inStream.eof())
{
inStream.read(buffer.get(), size1MB);
pOutStream->write(buffer.get(), inStream.gcount());
++nCurrentMegaBytes;
if (nCurrentMegaBytes >= nMegaBytesPerChunk)
{
pOutStream = createChunkFile(vecFilenames);
nCurrentMegaBytes = 0;
}
}
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::rep);
socket.bind("tcp://*:5555");
std::ifstream img("video2.mp4", std::ifstream::in | std::ios::binary);
std::ifstream aux;
std::vector<std::string> vecFilenames;
std::vector<zmq::const_buffer> data;
std::ostringstream os;
std::async(std::launch::async, [&img, &vecFilenames]() {
split(img, 100, vecFilenames);
});
img.close();
zmq::message_t message, aux2;
socket.recv(message, zmq::recv_flags::none);
//Put the chunks into the vector
std::async([&data, &aux, &os, &vecFilenames]() {
for (int i = 0; i < vecFilenames.size(); i++)
{
std::async([&aux, &i]() {
aux.open("chunk" + std::to_string(i) + ".mp4", std::ifstream::in | std::ios::binary);
});
os << aux.rdbuf();
data.push_back(zmq::buffer(os.str()));
os.clear();
aux.close();
}
});
//Send the vector for the client
std::async([&socket, &data] {
zmq::send_multipart(socket, data);
});
}
客户端:
#include <fstream>
#include <sstream>
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <queue>
#include <deque>
#include <future>
#include <vector>
using namespace std::chrono_literals;
template <typename T>
void debug(T x)
{
std::cout << x << std::endl;
}
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::req);
socket.connect("tcp://localhost:5555");
std::ofstream img("teste.mp4", std::ios::out | std::ios::binary);
socket.send(zmq::buffer("ok\n"), zmq::send_flags::none);
std::vector<zmq::message_t> send_msgs;
zmq::message_t size;
std::async([&send_msgs, &img, &socket] {
zmq::recv_multipart(socket, std::back_inserter(send_msgs));
while (send_msgs.size())
{
img << send_msgs[0].to_string();
send_msgs.erase(send_msgs.begin());
}
});
}
尝试通过 multipart-message 移动所有数据确实会将所有数据收集到一个巨大的 BLOB 中,再加上添加重复的 O/S-level transport-class 特定缓冲区,最可能的结果是崩溃。
将 video-BLOB 的各个块作为单独的 simple-message 有效载荷发送并重建 BLOB(最好通过索引编号,可以选择 re-request 任何未到达的部分receiver-side ).
使用带有 REQ/REP
的 std::async
模式似乎有点棘手,因为此原型必须保持其 dFSA 交错序列 .recv()-.send()-.recv()- .send()-...ad infinitum...因为如果不这样做就会陷入无法挽救的相互僵局。
对于流媒体视频(如 CV / scene-detection),还有更多技巧可以使用 - 其中之一是使用 ZMQ_CONFLATE
选项,以便发送最近的 video-frame,不会在“过时”scene-images 上浪费时间,它们已经成为历史的一部分,因此总是将最近的 video-frame 交付给 receiving-side 处理。