DDS Reader 未丢弃消息
DDS Reader not dropping messages
我正在使用 RTI 学习 DDS(对于这个主题还是很陌生)。我正在创建一个写入订阅者的发布者,订阅者输出消息。我想模拟的一件事是丢包。例如,假设发布者每秒向订阅者写入 4 次,但订阅者每秒只能读取一次(最新消息)。
截至目前,我可以创建发布者和订阅者 w/o 任何被丢弃的包。
我通读了一些文档并找到了 HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS。
如果我错了请纠正我,但我的印象是这基本上会保留从发布者收到的最新消息。相反,订阅者正在接收所有消息,但延迟了 1 秒。
我不想缓存消息而是删除消息。如何模拟 "dropped" 包?
顺便说一句:我不想更改 .xml 文件中的任何内容。我想以编程方式进行。
这是我的一些代码片段。
//Publisher.java
//writer = (MsgDataWriter)publisher.create_datawriter(topic, Publisher.DATAWRITER_QOS_DEFAULT,null /* listener */, StatusKind.STATUS_MASK_NONE);
writer = (MsgDataWriter)publisher.create_datawriter(topic, write, null,
StatusKind.STATUS_MASK_ALL);
if (writer == null) {
System.err.println("create_datawriter error\n");
return;
}
// --- Write --- //
String[] messages= {"1", "2", "test", "3"};
/* Create data sample for writing */
Msg instance = new Msg();
InstanceHandle_t instance_handle = InstanceHandle_t.HANDLE_NIL;
/* For a data type that has a key, if the same instance is going to be
written multiple times, initialize the key here
and register the keyed instance prior to writing */
//instance_handle = writer.register_instance(instance);
final long sendPeriodMillis = (long) (.25 * 1000); // 4 per second
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
if (count == 11)
{
return;
}
System.out.println("Writing Msg, count " + count);
/* Modify the instance to be written here */
instance.message =words[count];
instance.sender = "some user";
/* Write data */
writer.write(instance, instance_handle);
try {
Thread.sleep(sendPeriodMillis);
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
//writer.unregister_instance(instance, instance_handle);
} finally {
// --- Shutdown --- //
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
//Subscriber
// Customize time & Qos for receiving info
DataReaderQos readerQ = new DataReaderQos();
subscriber.get_default_datareader_qos(readerQ);
Duration_t minTime = new Duration_t(1,0);
readerQ.time_based_filter.minimum_separation.sec = minTime.sec;
readerQ.time_based_filter.minimum_separation.nanosec = minTime.nanosec;
readerQ.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
readerQ.reliability.kind = ReliabilityQosPolicyKind.BEST_EFFORT_RELIABILITY_QOS;
reader = (MsgDataReader)subscriber.create_datareader(topic, readerQ, listener, StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
return;
}
// --- Wait for data --- //
final long receivePeriodSec = 1;
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
//System.out.println("Msg subscriber sleeping for "+ receivePeriodSec + " sec...");
try {
Thread.sleep(receivePeriodSec * 1000); // in millisec
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
} finally {
// --- Shutdown --- //
在订阅者端,区分您的应用程序与 DDS 域之间三种不同类型的交互非常有用:轮询、侦听器和 WaitSets
轮询意味着应用程序决定何时读取可用数据。这通常是一种时间驱动机制。
侦听器基本上是回调函数,一旦数据可用,基础结构线程就会调用它们来读取该数据。
WaitSets 实现了一种类似于套接字的机制 select
机制:应用程序线程等待(阻塞)数据可用,并在解除阻塞后读取新数据。
您的应用程序使用了监听器机制。您没有 post 回调函数的实现,但从整体情况来看,监听器实现很可能会在回调被调用的那一刻立即尝试读取数据。没有时间让数据成为您所说的 "pushed out" 或 "dropped"。这种读取发生在与主线程不同的线程中,主线程大部分时间都在休眠。您可以找到有关它的知识库文章 here。
唯一不清楚的是 time_based_filter
QoS 设置的影响。你没有在你的问题中提到这一点,但它确实出现在代码中。我希望这可以过滤掉您的一些样本。不过,这是一种不同于推出历史记录的机制。对于不同的 DDS 实现,基于时间的过滤器的行为可能会以不同的方式实现。您使用哪种产品?
我正在使用 RTI 学习 DDS(对于这个主题还是很陌生)。我正在创建一个写入订阅者的发布者,订阅者输出消息。我想模拟的一件事是丢包。例如,假设发布者每秒向订阅者写入 4 次,但订阅者每秒只能读取一次(最新消息)。
截至目前,我可以创建发布者和订阅者 w/o 任何被丢弃的包。
我通读了一些文档并找到了 HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS。
如果我错了请纠正我,但我的印象是这基本上会保留从发布者收到的最新消息。相反,订阅者正在接收所有消息,但延迟了 1 秒。
我不想缓存消息而是删除消息。如何模拟 "dropped" 包?
顺便说一句:我不想更改 .xml 文件中的任何内容。我想以编程方式进行。
这是我的一些代码片段。
//Publisher.java
//writer = (MsgDataWriter)publisher.create_datawriter(topic, Publisher.DATAWRITER_QOS_DEFAULT,null /* listener */, StatusKind.STATUS_MASK_NONE);
writer = (MsgDataWriter)publisher.create_datawriter(topic, write, null,
StatusKind.STATUS_MASK_ALL);
if (writer == null) {
System.err.println("create_datawriter error\n");
return;
}
// --- Write --- //
String[] messages= {"1", "2", "test", "3"};
/* Create data sample for writing */
Msg instance = new Msg();
InstanceHandle_t instance_handle = InstanceHandle_t.HANDLE_NIL;
/* For a data type that has a key, if the same instance is going to be
written multiple times, initialize the key here
and register the keyed instance prior to writing */
//instance_handle = writer.register_instance(instance);
final long sendPeriodMillis = (long) (.25 * 1000); // 4 per second
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
if (count == 11)
{
return;
}
System.out.println("Writing Msg, count " + count);
/* Modify the instance to be written here */
instance.message =words[count];
instance.sender = "some user";
/* Write data */
writer.write(instance, instance_handle);
try {
Thread.sleep(sendPeriodMillis);
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
//writer.unregister_instance(instance, instance_handle);
} finally {
// --- Shutdown --- //
if(participant != null) {
participant.delete_contained_entities();
DomainParticipantFactory.TheParticipantFactory.
delete_participant(participant);
}
//Subscriber
// Customize time & Qos for receiving info
DataReaderQos readerQ = new DataReaderQos();
subscriber.get_default_datareader_qos(readerQ);
Duration_t minTime = new Duration_t(1,0);
readerQ.time_based_filter.minimum_separation.sec = minTime.sec;
readerQ.time_based_filter.minimum_separation.nanosec = minTime.nanosec;
readerQ.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
readerQ.reliability.kind = ReliabilityQosPolicyKind.BEST_EFFORT_RELIABILITY_QOS;
reader = (MsgDataReader)subscriber.create_datareader(topic, readerQ, listener, StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
return;
}
// --- Wait for data --- //
final long receivePeriodSec = 1;
for (int count = 0;
(sampleCount == 0) || (count < sampleCount);
++count) {
//System.out.println("Msg subscriber sleeping for "+ receivePeriodSec + " sec...");
try {
Thread.sleep(receivePeriodSec * 1000); // in millisec
} catch (InterruptedException ix) {
System.err.println("INTERRUPTED");
break;
}
}
} finally {
// --- Shutdown --- //
在订阅者端,区分您的应用程序与 DDS 域之间三种不同类型的交互非常有用:轮询、侦听器和 WaitSets
轮询意味着应用程序决定何时读取可用数据。这通常是一种时间驱动机制。
侦听器基本上是回调函数,一旦数据可用,基础结构线程就会调用它们来读取该数据。
WaitSets 实现了一种类似于套接字的机制 select
机制:应用程序线程等待(阻塞)数据可用,并在解除阻塞后读取新数据。
您的应用程序使用了监听器机制。您没有 post 回调函数的实现,但从整体情况来看,监听器实现很可能会在回调被调用的那一刻立即尝试读取数据。没有时间让数据成为您所说的 "pushed out" 或 "dropped"。这种读取发生在与主线程不同的线程中,主线程大部分时间都在休眠。您可以找到有关它的知识库文章 here。
唯一不清楚的是 time_based_filter
QoS 设置的影响。你没有在你的问题中提到这一点,但它确实出现在代码中。我希望这可以过滤掉您的一些样本。不过,这是一种不同于推出历史记录的机制。对于不同的 DDS 实现,基于时间的过滤器的行为可能会以不同的方式实现。您使用哪种产品?