Java DDS 无限序列导致内存不足错误
Java DDS Unbounded Sequence Causes Out Of Memory Error
我是 DDS 的新手,正在尝试在 Intellij-IDEA 中编写一个简单的 Java 程序,该程序由 3 个部分组成:
- 发送数据的客户端模拟器。
- 我的程序模拟器,它从客户端接收数据,对其进行处理并将其发送回客户端。
- 读取操纵数据的客户端模拟器。
我在示例中尝试发送的所有数据都是一个简单的字符串。
我正在使用 RTI Code Gen 自动生成大部分代码。
没有 unboundedSupport
标志(字符串限制为 255 个字符)一切正常。但是,在应用 unboundedSupport
标志时,出现以下内存不足错误:
java.lang.OutOfMemoryError: Java heap space
at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error
我正在激活首先读取数据的客户端模拟器。
这是我的 .idl 文件:
struct JsonMessage {
string msg;
};
这是我的主程序(第10行是subscriber1
的初始化):
public static void main(String... args) {
ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber1.consume();
ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber2.consume();
ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
JsonMessageSubscriber subscriber3 =
new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
subscriber3.consume();
}
这是我的 ClientResultsConsumer class:
public class ClientResultsConsumer implements Consumer {
@Override
public void consume(String msg) {
System.out.println("Client results consumer got " + msg);
}
}
这是我的 JsonMessageSubscriber class(第 71 行是 subscriber.create_datareader(
):
public class JsonMessageSubscriber implements DataConsumer {
ExecutorService executor = Executors.newSingleThreadExecutor();
public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {
DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
.create_participant(domainId,
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("create_participant error\n");
System.exit(-1);
}
// --- Create subscriber --- //
/* To customize subscriber QoS, use
the configuration file USER_QOS_PROFILES.xml */
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (subscriber == null) {
System.err.println("create_subscriber error\n");
System.exit(-1);
}
// --- Create topic --- //
/* Register type before creating topic */
String typeName = JsonMessageTypeSupport.get_type_name();
JsonMessageTypeSupport.register_type(participant, typeName);
/* To customize topic QoS, use
the configuration file USER_QOS_PROFILES.xml */
Topic topic = participant.create_topic(
topicName,
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("create_topic error\n");
System.exit(-1);
}
// --- Create reader --- //
DataReaderListener listener = new JsonMessageListener(consumer);
/* To customize data reader QoS, use
the configuration file USER_QOS_PROFILES.xml */
JsonMessageDataReader reader = (JsonMessageDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
System.exit(-1);
}
}
// -----------------------------------------------------------------------
@Override
public void consume() {
final long scanTimeMillis = 1000;
Runnable task = () -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
};
executor.submit(task);
}
}
不幸的是,除了限制序列大小外,我没有找到解决方案,但我知道将它限制在足够大的数量会解决我的问题,它也需要大量内存,我会而是它不会超过每条消息所需的最低限度。
任何帮助将不胜感激,
谢谢
使用 -unboundedSupport 时,必须在 QoS 文件中设置一些内存阈值。这些阈值在用户手册中进行了描述 here,它们定义了样本内存动态分配或从预分配源重用的阈值。这些必须在 DataReader 和 DataWriter 中设置。
这些阈值的设置实际上取决于您的数据大小,根据您的描述,我没有足够的信息来为您提供在您的场景中有意义的示例。基本上,您不想为每个样本动态分配内存。这可能会对性能产生影响,具体取决于您的数据速率。您想要 select 大多数样本使用预分配内存的值。用户手册中“Writer-Side Memory Management when Working with Large Data" is of video streaming which contains larger less frequent I-frames and smaller more frequent P-frames. You can look at that section and the corresponding DataReader section 示例 XML 文件部分下提供的示例。
我使用示例成功解决了问题here
它所要做的只是将自动生成的 qos 文件路径传递给 subscriber/publisher 构造函数,而不是在初始化域参与者之前编写这些行(这与 link 中提供的示例不同以上,提供的示例对我不起作用):
DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);
我是 DDS 的新手,正在尝试在 Intellij-IDEA 中编写一个简单的 Java 程序,该程序由 3 个部分组成:
- 发送数据的客户端模拟器。
- 我的程序模拟器,它从客户端接收数据,对其进行处理并将其发送回客户端。
- 读取操纵数据的客户端模拟器。
我在示例中尝试发送的所有数据都是一个简单的字符串。
我正在使用 RTI Code Gen 自动生成大部分代码。
没有 unboundedSupport
标志(字符串限制为 255 个字符)一切正常。但是,在应用 unboundedSupport
标志时,出现以下内存不足错误:
java.lang.OutOfMemoryError: Java heap space
at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error
我正在激活首先读取数据的客户端模拟器。
这是我的 .idl 文件:
struct JsonMessage {
string msg;
};
这是我的主程序(第10行是subscriber1
的初始化):
public static void main(String... args) {
ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber1.consume();
ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber2.consume();
ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
JsonMessageSubscriber subscriber3 =
new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
subscriber3.consume();
}
这是我的 ClientResultsConsumer class:
public class ClientResultsConsumer implements Consumer {
@Override
public void consume(String msg) {
System.out.println("Client results consumer got " + msg);
}
}
这是我的 JsonMessageSubscriber class(第 71 行是 subscriber.create_datareader(
):
public class JsonMessageSubscriber implements DataConsumer {
ExecutorService executor = Executors.newSingleThreadExecutor();
public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {
DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
.create_participant(domainId,
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("create_participant error\n");
System.exit(-1);
}
// --- Create subscriber --- //
/* To customize subscriber QoS, use
the configuration file USER_QOS_PROFILES.xml */
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (subscriber == null) {
System.err.println("create_subscriber error\n");
System.exit(-1);
}
// --- Create topic --- //
/* Register type before creating topic */
String typeName = JsonMessageTypeSupport.get_type_name();
JsonMessageTypeSupport.register_type(participant, typeName);
/* To customize topic QoS, use
the configuration file USER_QOS_PROFILES.xml */
Topic topic = participant.create_topic(
topicName,
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("create_topic error\n");
System.exit(-1);
}
// --- Create reader --- //
DataReaderListener listener = new JsonMessageListener(consumer);
/* To customize data reader QoS, use
the configuration file USER_QOS_PROFILES.xml */
JsonMessageDataReader reader = (JsonMessageDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
System.exit(-1);
}
}
// -----------------------------------------------------------------------
@Override
public void consume() {
final long scanTimeMillis = 1000;
Runnable task = () -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
};
executor.submit(task);
}
}
不幸的是,除了限制序列大小外,我没有找到解决方案,但我知道将它限制在足够大的数量会解决我的问题,它也需要大量内存,我会而是它不会超过每条消息所需的最低限度。
任何帮助将不胜感激, 谢谢
使用 -unboundedSupport 时,必须在 QoS 文件中设置一些内存阈值。这些阈值在用户手册中进行了描述 here,它们定义了样本内存动态分配或从预分配源重用的阈值。这些必须在 DataReader 和 DataWriter 中设置。
这些阈值的设置实际上取决于您的数据大小,根据您的描述,我没有足够的信息来为您提供在您的场景中有意义的示例。基本上,您不想为每个样本动态分配内存。这可能会对性能产生影响,具体取决于您的数据速率。您想要 select 大多数样本使用预分配内存的值。用户手册中“Writer-Side Memory Management when Working with Large Data" is of video streaming which contains larger less frequent I-frames and smaller more frequent P-frames. You can look at that section and the corresponding DataReader section 示例 XML 文件部分下提供的示例。
我使用示例成功解决了问题here
它所要做的只是将自动生成的 qos 文件路径传递给 subscriber/publisher 构造函数,而不是在初始化域参与者之前编写这些行(这与 link 中提供的示例不同以上,提供的示例对我不起作用):
DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);