流 API 对于 Fast-RTPS
Streams API For Fast-RTPS
我想使用 fast-rtps 向订阅者发布视频(流数据)。虽然我成功发布了十个连续的 jpg 文件,但订阅者收到的每张图片都浪费了很多时间来处理,因为我使用函数 get_byte_value
一张一张地获取像素。
有谁知道如何通过fast-rtps中间件更高效地发布和订阅? (创建新类型?其他?)
下面是我的发布者和订阅者的代码:
Publisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file PicturePublisher.cpp
*
*/
#include "Publisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <thread>
#include <time.h>
#include <vector>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PicturePublisher::PicturePublisher()
: mp_participant(nullptr)
, mp_publisher(nullptr)
, m_DynType(DynamicType_ptr(nullptr))
{
}
bool PicturePublisher::init()
{
cv::Mat image = cv::imread("drone.jpg", 1);
std::vector<unsigned char> buffer;
cv::imencode(".jpg", image, buffer);
// Create basic builders
DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());
DynamicType_ptr octet_type(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type, 3873715));
DynamicType_ptr sequence_type = sequence_type_builder->build();
// Add members to the struct. By the way, id must be consecutive starting by zero.
struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(2, "Picture", sequence_type);
struct_type_builder->set_name("Picture"); // Need to be same with topic data type
DynamicType_ptr dynType = struct_type_builder->build();
m_DynType.SetDynamicType(dynType);
m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
m_DynHello->set_uint32_value(0, 0);
m_DynHello->set_uint32_value(buffer.size(), 1);
MemberId id;
// std::cout << "init: " << id << std::endl;
DynamicData* sequence_data = m_DynHello->loan_value(2);
for (int i = 0; i < buffer.size(); i++) {
if (i == buffer.size() - 1) {
std::cout << "Total Size: " << i + 1 << std::endl;
}
sequence_data->insert_byte_value(buffer[i], id);
}
m_DynHello->return_loaned_value(sequence_data);
ParticipantAttributes PParam;
PParam.rtps.setName("DynPicture_pub");
mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "Picture";
Wparam.topic.topicName = "PictureTopic";
mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
if (mp_publisher == nullptr)
{
return false;
}
return true;
}
PicturePublisher::~PicturePublisher()
{
Domain::removeParticipant(mp_participant);
DynamicDataFactory::get_instance()->delete_data(m_DynHello);
Domain::stopAll();
}
void PicturePublisher::PubListener::onPublicationMatched(
Publisher* /*pub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
firstConnected = true;
std::cout << "Publisher matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Publisher unmatched" << std::endl;
}
}
void PicturePublisher::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PicturePublisher::runThread(
uint32_t samples,
uint32_t sleep)
{
uint32_t i = 0;
while (!stop && (i < samples || samples == 0))
{
if (publish(samples != 0))
{
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
std::cout << "runThreading...; \tSample Index: " << index << "; \t";
uint32_t size;
m_DynHello->get_uint32_value(size, 1);
std::cout << "size: " << size << std::endl;
if (i == 9){
std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
// Avoid unmatched condition impact subscriber receiving message
std::cout << "Wait within twenty second..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}
++i;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
}
void PicturePublisher::run(
uint32_t samples,
uint32_t sleep)
{
stop = false;
std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
if (samples == 0)
{
std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
std::cin.ignore();
stop = true;
}
else
{
std::cout << "Publisher running " << samples << " samples." << std::endl;
}
thread.join();
}
bool PicturePublisher::publish(
bool waitForListener)
{
// std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
{
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
m_DynHello->set_uint32_value(index + 1, 0);
mp_publisher->write((void*)m_DynHello);
return true;
}
return false;
}
在PicturePublisher::init()
函数中
Subsciber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file Subscriber.cpp
*
*/
#include "Subscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <vector>
#include <string>
#include <sstream>
#include <iterator>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PictureSubscriber::PictureSubscriber()
: mp_participant(nullptr)
, mp_subscriber(nullptr)
, m_DynType(DynamicType_ptr(nullptr))
{
}
struct timespec begin, end;
double elapsed;
std::vector<unsigned char> buffer;
bool PictureSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.setName("DynPicture_sub");
mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);
if (mp_participant == nullptr)
{
return false;
}
// Create basic builders
DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());
DynamicTypeBuilder_ptr octet_builder(DynamicTypeBuilderFactory::get_instance()->create_byte_builder());
DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_builder.get(), 3873715));
DynamicType_ptr sequence_type = sequence_type_builder->build();
// Add members to the struct.
struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(2, "Picture", sequence_type);
struct_type_builder->set_name("Picture");
DynamicType_ptr dynType = struct_type_builder->build();
m_DynType.SetDynamicType(dynType);
m_listener.m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
//REGISTER THE TYPE
Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "Picture";
Rparam.topic.topicName = "PictureTopic";
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);
if (mp_subscriber == nullptr)
{
return false;
}
return true;
}
PictureSubscriber::~PictureSubscriber()
{
Domain::removeParticipant(mp_participant);
DynamicDataFactory::get_instance()->delete_data(m_listener.m_DynHello);
Domain::stopAll();
}
void PictureSubscriber::SubListener::onSubscriptionMatched(
Subscriber* /*sub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << "Subscriber matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Subscriber unmatched" << std::endl;
}
}
void PictureSubscriber::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PictureSubscriber::SubListener::onNewDataMessage(
Subscriber* sub)
{
if (sub->takeNextData((void*)m_DynHello, &m_info))
{
if (m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
std::cout << "index: " << index << "; \t";
uint32_t size;
m_DynHello->get_uint32_value(size, 1);
std::cout << "size: " << size << std::endl;
DynamicType_ptr octet_type_temp(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
DynamicTypeBuilder_ptr sequence_type_builder_temp(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type_temp, 3873715));
DynamicType_ptr sequence_type_temp = sequence_type_builder_temp->build();
DynamicData* sequence_data_temp = m_DynHello->loan_value(2);
for (int i = 0; i < size; i++) {
buffer.push_back(sequence_data_temp->get_byte_value(i));
}
m_DynHello->return_loaned_value(sequence_data_temp);
cv::Mat imageDecoded = cv::imdecode(buffer, 1);
cv::imwrite(std::to_string(index) + "_droneNew.jpg", imageDecoded);
}
}
}
void PictureSubscriber::run()
{
std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
std::cin.ignore();
}
void PictureSubscriber::run(
uint32_t number)
{
std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
while (number > this->m_listener.n_samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
在PictureSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
函数中
在 eProsima,我们已经为您指出的问题找到了一些解决方案。
首先请注意,您不需要使用动态类型来定义包含您要传输的图像的类型。在您的案例中,最简单的做法是通过 IDL 文件定义您的类型。使用 IDL 文件和 Fast-DDS-Gen 工具,您可以生成访问数据类型元素的代码,并自动生成数据序列化和反序列化函数。在 Picture.idl 文件中,您将找到以 IDL 格式定义的类型,它最适合您使用动态类型创建的数据类型。 Here you can find a guide on how to use the Fast-DDS-Gen tool. In this documentation you will also find a complete example of how an IDL file can be used to generate a complete DDS publisher/subscriber application, as well as the supported formats 用于数据。下面还有文件 Publisher.cpp 和 Subscriber.cpp 已根据新数据类型进行了修改。
我们还建议您查看示例 HelloWorldExample,因为它最适合您的需要。在此示例中,您还可以发现新的 DDS API,包含在最新版本的 Fast DDS (2.1.0) 中。
作为附加评论,我们建议您在传输图像之前以字符串 base64 格式对图像进行编码,而不是传输八位位组向量,因为它是图像传输最广泛的格式之一。
Picture.idl
struct Picture
{
unsigned long index;
unsigned long size;
sequence<octet> picture;
};
Publisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file PicturePublisher.cpp
*
*/
#include "PicturePublisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <thread>
#include <time.h>
#include <vector>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PicturePublisher::PicturePublisher()
: mp_participant(nullptr)
, mp_publisher(nullptr)
{
}
bool PicturePublisher::init()
{
cv::Mat image = cv::imread("dog.jpg", cv::IMREAD_COLOR);
if(image.empty())
{
std::cout << "Could not read the image." << std::endl;
return false;
}
cv::imshow("Display window", image);
int k = cv::waitKey(0);
std::vector<unsigned char> buffer;
if(!cv::imencode(".jpg", image, buffer)){
printf("Image encoding failed");
}
m_Picture.index(0);
m_Picture.size(buffer.size());
m_Picture.picture(buffer);
ParticipantAttributes PParam;
PParam.rtps.setName("Picture_pub");
mp_participant = Domain::createParticipant(PParam, &m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerType(mp_participant, &m_type);
// Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "Picture";
Wparam.topic.topicName = "PictureTopic";
mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
if (mp_publisher == nullptr)
{
return false;
}
return true;
}
PicturePublisher::~PicturePublisher()
{
Domain::removeParticipant(mp_participant);
}
void PicturePublisher::PubListener::onPublicationMatched(
Publisher* /*pub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
firstConnected = true;
std::cout << "Publisher matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Publisher unmatched" << std::endl;
}
}
void PicturePublisher::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PicturePublisher::runThread(
uint32_t samples,
uint32_t sleep)
{
uint32_t i = 0;
while (!stop && (i < samples || samples == 0))
{
if (publish(samples != 0))
{
std::cout << "runThreading...; \tSample Index: " << m_Picture.index() << "; \t";
std::cout << "size: " << m_Picture.size() << std::endl;
if (i == 9){
std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
// Avoid unmatched condition impact subscriber receiving message
std::cout << "Wait within twenty second..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}
++i;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
}
void PicturePublisher::run(
uint32_t samples,
uint32_t sleep)
{
stop = false;
std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
if (samples == 0)
{
std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
std::cin.ignore();
stop = true;
}
else
{
std::cout << "Publisher running " << samples << " samples." << std::endl;
}
thread.join();
}
bool PicturePublisher::publish(
bool waitForListener)
{
// std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
{
m_Picture.index(m_Picture.index() + 1);
mp_publisher->write((void*)&m_Picture);
return true;
}
return false;
}
Subscriber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file Subscriber.cpp
*
*/
#include "PictureSubscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <vector>
#include <string>
#include <sstream>
#include <iterator>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PictureSubscriber::PictureSubscriber()
: mp_participant(nullptr)
, mp_subscriber(nullptr)
{
}
struct timespec begin, end;
double elapsed;
std::vector<unsigned char> buffer;
bool PictureSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.setName("Picture_sub");
mp_participant = Domain::createParticipant(PParam, &m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerType(mp_participant, &m_type);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "Picture";
Rparam.topic.topicName = "PictureTopic";
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);
if (mp_subscriber == nullptr)
{
return false;
}
return true;
}
PictureSubscriber::~PictureSubscriber()
{
Domain::removeParticipant(mp_participant);
}
void PictureSubscriber::SubListener::onSubscriptionMatched(
Subscriber* /*sub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << "Subscriber matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Subscriber unmatched" << std::endl;
}
}
void PictureSubscriber::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PictureSubscriber::SubListener::onNewDataMessage(
Subscriber* sub)
{
std::cout << "Data received." << std::endl;
if (sub->takeNextData((void*)&m_Picture, &m_info))
{
if (m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
uint32_t index = m_Picture.index();
std::cout << "index: " << index << "; \t";
std::cout << "size: " << m_Picture.size() << std::endl;
cv::Mat imageDecoded = cv::imdecode(m_Picture.picture(), 1);
cv::imwrite(std::to_string(index) + "_dog_received.jpg", imageDecoded);
}
}
}
void PictureSubscriber::run()
{
std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
std::cin.ignore();
}
void PictureSubscriber::run(
uint32_t number)
{
std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
while (number > this->m_listener.n_samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
我想使用 fast-rtps 向订阅者发布视频(流数据)。虽然我成功发布了十个连续的 jpg 文件,但订阅者收到的每张图片都浪费了很多时间来处理,因为我使用函数 get_byte_value
一张一张地获取像素。
有谁知道如何通过fast-rtps中间件更高效地发布和订阅? (创建新类型?其他?)
下面是我的发布者和订阅者的代码:
Publisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file PicturePublisher.cpp
*
*/
#include "Publisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <thread>
#include <time.h>
#include <vector>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PicturePublisher::PicturePublisher()
: mp_participant(nullptr)
, mp_publisher(nullptr)
, m_DynType(DynamicType_ptr(nullptr))
{
}
bool PicturePublisher::init()
{
cv::Mat image = cv::imread("drone.jpg", 1);
std::vector<unsigned char> buffer;
cv::imencode(".jpg", image, buffer);
// Create basic builders
DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());
DynamicType_ptr octet_type(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type, 3873715));
DynamicType_ptr sequence_type = sequence_type_builder->build();
// Add members to the struct. By the way, id must be consecutive starting by zero.
struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(2, "Picture", sequence_type);
struct_type_builder->set_name("Picture"); // Need to be same with topic data type
DynamicType_ptr dynType = struct_type_builder->build();
m_DynType.SetDynamicType(dynType);
m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
m_DynHello->set_uint32_value(0, 0);
m_DynHello->set_uint32_value(buffer.size(), 1);
MemberId id;
// std::cout << "init: " << id << std::endl;
DynamicData* sequence_data = m_DynHello->loan_value(2);
for (int i = 0; i < buffer.size(); i++) {
if (i == buffer.size() - 1) {
std::cout << "Total Size: " << i + 1 << std::endl;
}
sequence_data->insert_byte_value(buffer[i], id);
}
m_DynHello->return_loaned_value(sequence_data);
ParticipantAttributes PParam;
PParam.rtps.setName("DynPicture_pub");
mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "Picture";
Wparam.topic.topicName = "PictureTopic";
mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
if (mp_publisher == nullptr)
{
return false;
}
return true;
}
PicturePublisher::~PicturePublisher()
{
Domain::removeParticipant(mp_participant);
DynamicDataFactory::get_instance()->delete_data(m_DynHello);
Domain::stopAll();
}
void PicturePublisher::PubListener::onPublicationMatched(
Publisher* /*pub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
firstConnected = true;
std::cout << "Publisher matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Publisher unmatched" << std::endl;
}
}
void PicturePublisher::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PicturePublisher::runThread(
uint32_t samples,
uint32_t sleep)
{
uint32_t i = 0;
while (!stop && (i < samples || samples == 0))
{
if (publish(samples != 0))
{
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
std::cout << "runThreading...; \tSample Index: " << index << "; \t";
uint32_t size;
m_DynHello->get_uint32_value(size, 1);
std::cout << "size: " << size << std::endl;
if (i == 9){
std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
// Avoid unmatched condition impact subscriber receiving message
std::cout << "Wait within twenty second..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}
++i;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
}
void PicturePublisher::run(
uint32_t samples,
uint32_t sleep)
{
stop = false;
std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
if (samples == 0)
{
std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
std::cin.ignore();
stop = true;
}
else
{
std::cout << "Publisher running " << samples << " samples." << std::endl;
}
thread.join();
}
bool PicturePublisher::publish(
bool waitForListener)
{
// std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
{
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
m_DynHello->set_uint32_value(index + 1, 0);
mp_publisher->write((void*)m_DynHello);
return true;
}
return false;
}
在PicturePublisher::init()
函数中
Subsciber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file Subscriber.cpp
*
*/
#include "Subscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <vector>
#include <string>
#include <sstream>
#include <iterator>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PictureSubscriber::PictureSubscriber()
: mp_participant(nullptr)
, mp_subscriber(nullptr)
, m_DynType(DynamicType_ptr(nullptr))
{
}
struct timespec begin, end;
double elapsed;
std::vector<unsigned char> buffer;
bool PictureSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.setName("DynPicture_sub");
mp_participant = Domain::createParticipant(PParam, (ParticipantListener*)&m_part_list);
if (mp_participant == nullptr)
{
return false;
}
// Create basic builders
DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());
DynamicTypeBuilder_ptr octet_builder(DynamicTypeBuilderFactory::get_instance()->create_byte_builder());
DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_builder.get(), 3873715));
DynamicType_ptr sequence_type = sequence_type_builder->build();
// Add members to the struct.
struct_type_builder->add_member(0, "index", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(1, "size", DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
struct_type_builder->add_member(2, "Picture", sequence_type);
struct_type_builder->set_name("Picture");
DynamicType_ptr dynType = struct_type_builder->build();
m_DynType.SetDynamicType(dynType);
m_listener.m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
//REGISTER THE TYPE
Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "Picture";
Rparam.topic.topicName = "PictureTopic";
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);
if (mp_subscriber == nullptr)
{
return false;
}
return true;
}
PictureSubscriber::~PictureSubscriber()
{
Domain::removeParticipant(mp_participant);
DynamicDataFactory::get_instance()->delete_data(m_listener.m_DynHello);
Domain::stopAll();
}
void PictureSubscriber::SubListener::onSubscriptionMatched(
Subscriber* /*sub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << "Subscriber matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Subscriber unmatched" << std::endl;
}
}
void PictureSubscriber::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PictureSubscriber::SubListener::onNewDataMessage(
Subscriber* sub)
{
if (sub->takeNextData((void*)m_DynHello, &m_info))
{
if (m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
uint32_t index;
m_DynHello->get_uint32_value(index, 0);
std::cout << "index: " << index << "; \t";
uint32_t size;
m_DynHello->get_uint32_value(size, 1);
std::cout << "size: " << size << std::endl;
DynamicType_ptr octet_type_temp(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
DynamicTypeBuilder_ptr sequence_type_builder_temp(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type_temp, 3873715));
DynamicType_ptr sequence_type_temp = sequence_type_builder_temp->build();
DynamicData* sequence_data_temp = m_DynHello->loan_value(2);
for (int i = 0; i < size; i++) {
buffer.push_back(sequence_data_temp->get_byte_value(i));
}
m_DynHello->return_loaned_value(sequence_data_temp);
cv::Mat imageDecoded = cv::imdecode(buffer, 1);
cv::imwrite(std::to_string(index) + "_droneNew.jpg", imageDecoded);
}
}
}
void PictureSubscriber::run()
{
std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
std::cin.ignore();
}
void PictureSubscriber::run(
uint32_t number)
{
std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
while (number > this->m_listener.n_samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
在PictureSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
函数中
在 eProsima,我们已经为您指出的问题找到了一些解决方案。
首先请注意,您不需要使用动态类型来定义包含您要传输的图像的类型。在您的案例中,最简单的做法是通过 IDL 文件定义您的类型。使用 IDL 文件和 Fast-DDS-Gen 工具,您可以生成访问数据类型元素的代码,并自动生成数据序列化和反序列化函数。在 Picture.idl 文件中,您将找到以 IDL 格式定义的类型,它最适合您使用动态类型创建的数据类型。 Here you can find a guide on how to use the Fast-DDS-Gen tool. In this documentation you will also find a complete example of how an IDL file can be used to generate a complete DDS publisher/subscriber application, as well as the supported formats 用于数据。下面还有文件 Publisher.cpp 和 Subscriber.cpp 已根据新数据类型进行了修改。
我们还建议您查看示例 HelloWorldExample,因为它最适合您的需要。在此示例中,您还可以发现新的 DDS API,包含在最新版本的 Fast DDS (2.1.0) 中。
作为附加评论,我们建议您在传输图像之前以字符串 base64 格式对图像进行编码,而不是传输八位位组向量,因为它是图像传输最广泛的格式之一。
Picture.idl
struct Picture
{
unsigned long index;
unsigned long size;
sequence<octet> picture;
};
Publisher.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file PicturePublisher.cpp
*
*/
#include "PicturePublisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <thread>
#include <time.h>
#include <vector>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PicturePublisher::PicturePublisher()
: mp_participant(nullptr)
, mp_publisher(nullptr)
{
}
bool PicturePublisher::init()
{
cv::Mat image = cv::imread("dog.jpg", cv::IMREAD_COLOR);
if(image.empty())
{
std::cout << "Could not read the image." << std::endl;
return false;
}
cv::imshow("Display window", image);
int k = cv::waitKey(0);
std::vector<unsigned char> buffer;
if(!cv::imencode(".jpg", image, buffer)){
printf("Image encoding failed");
}
m_Picture.index(0);
m_Picture.size(buffer.size());
m_Picture.picture(buffer);
ParticipantAttributes PParam;
PParam.rtps.setName("Picture_pub");
mp_participant = Domain::createParticipant(PParam, &m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerType(mp_participant, &m_type);
// Domain::registerDynamicType(mp_participant, &m_DynType);
//CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "Picture";
Wparam.topic.topicName = "PictureTopic";
mp_publisher = Domain::createPublisher(mp_participant, Wparam, (PublisherListener*)&m_listener);
if (mp_publisher == nullptr)
{
return false;
}
return true;
}
PicturePublisher::~PicturePublisher()
{
Domain::removeParticipant(mp_participant);
}
void PicturePublisher::PubListener::onPublicationMatched(
Publisher* /*pub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
firstConnected = true;
std::cout << "Publisher matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Publisher unmatched" << std::endl;
}
}
void PicturePublisher::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PicturePublisher::runThread(
uint32_t samples,
uint32_t sleep)
{
uint32_t i = 0;
while (!stop && (i < samples || samples == 0))
{
if (publish(samples != 0))
{
std::cout << "runThreading...; \tSample Index: " << m_Picture.index() << "; \t";
std::cout << "size: " << m_Picture.size() << std::endl;
if (i == 9){
std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
// Avoid unmatched condition impact subscriber receiving message
std::cout << "Wait within twenty second..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
}
++i;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
}
void PicturePublisher::run(
uint32_t samples,
uint32_t sleep)
{
stop = false;
std::thread thread(&PicturePublisher::runThread, this, samples, sleep);
if (samples == 0)
{
std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
std::cin.ignore();
stop = true;
}
else
{
std::cout << "Publisher running " << samples << " samples." << std::endl;
}
thread.join();
}
bool PicturePublisher::publish(
bool waitForListener)
{
// std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
{
m_Picture.index(m_Picture.index() + 1);
mp_publisher->write((void*)&m_Picture);
return true;
}
return false;
}
Subscriber.cpp
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License, Version 2.0 (the "License");
/**
* @file Subscriber.cpp
*
*/
#include "PictureSubscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>
#include <vector>
#include <string>
#include <sstream>
#include <iterator>
#include <opencv2/opencv.hpp>
using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;
// using namespace cv;
PictureSubscriber::PictureSubscriber()
: mp_participant(nullptr)
, mp_subscriber(nullptr)
{
}
struct timespec begin, end;
double elapsed;
std::vector<unsigned char> buffer;
bool PictureSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.setName("Picture_sub");
mp_participant = Domain::createParticipant(PParam, &m_part_list);
if (mp_participant == nullptr)
{
return false;
}
//REGISTER THE TYPE
Domain::registerType(mp_participant, &m_type);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "Picture";
Rparam.topic.topicName = "PictureTopic";
mp_subscriber = Domain::createSubscriber(mp_participant, Rparam, (SubscriberListener*)&m_listener);
if (mp_subscriber == nullptr)
{
return false;
}
return true;
}
PictureSubscriber::~PictureSubscriber()
{
Domain::removeParticipant(mp_participant);
}
void PictureSubscriber::SubListener::onSubscriptionMatched(
Subscriber* /*sub*/,
MatchingInfo& info)
{
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << "Subscriber matched" << std::endl;
}
else
{
n_matched--;
std::cout << "Subscriber unmatched" << std::endl;
}
}
void PictureSubscriber::PartListener::onParticipantDiscovery(
Participant*,
ParticipantDiscoveryInfo&& info)
{
if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
}
else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
}
}
void PictureSubscriber::SubListener::onNewDataMessage(
Subscriber* sub)
{
std::cout << "Data received." << std::endl;
if (sub->takeNextData((void*)&m_Picture, &m_info))
{
if (m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
uint32_t index = m_Picture.index();
std::cout << "index: " << index << "; \t";
std::cout << "size: " << m_Picture.size() << std::endl;
cv::Mat imageDecoded = cv::imdecode(m_Picture.picture(), 1);
cv::imwrite(std::to_string(index) + "_dog_received.jpg", imageDecoded);
}
}
}
void PictureSubscriber::run()
{
std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
std::cin.ignore();
}
void PictureSubscriber::run(
uint32_t number)
{
std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
while (number > this->m_listener.n_samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}