如何使用 python 和静态创建的主题 类 创建 OpenSplice DDS 主题?

How to create an OpenSplice DDS topic using python and statically created topic classes?

我一直在尝试使用 ADLINK 的 Vortex OpenSplice Community edition with the Python API (python version 3.6 within a PyEnv virtual environment) on Ubuntu 20.04.2 LTS. I've followed the PythonDCPSAPIGuide 并在 ($OSPL_HOME/tools/python/examples) 中使用 python 示例。但是,我无法弄清楚如何使用 idlpp 为静态生成的主题 class 创建与域参与者关联的主题。我怎样才能做到这一点?

到目前为止我做了什么:

我有一个 IDL 文件,其中包含许多其他 IDL 文件的路径。我已使用以下 bash 脚本将这些 IDL 文件转换为 python 主题 classes:

#!/bin/bash

for FILE in *.idl; do
  $OSPL_HOME/bin/idlpp -I $OSPL_HOME/etc/idl -S -l python -d . $FILE
done

这会创建一系列 python 包(python 主题 classes),我将它们导入我的 python 脚本中,该脚本位于同一目录中。

使用这些包,我想在我的 python 脚本中创建或注册域参与者的主题。例如下面的 python 代码,(但是 'create_topic' 函数不存在):

# myExampleDDSFile.py

from dds import *
from foo import foo_type # idlpp generated module/class
from foo2 import foo_type2 # idlpp generated module/class

dp = DomainParticipant()
topic = dp.create_topic('foo_topic',foo_type) # this function doesn't exist for a domain participant
pub = dp.create_publisher()

这是否可行?如果可行,我将如何注册我在 python 中与域参与者静态创建的主题?


我注意到在提供的 python 示例中(例如 $OSPL_HOME/tools/python/examples/example1.py)使用下面的代码动态注册了一个主题,但我认为这不相关静态生成 python 主题 classes:

# example1.py snippet

dp = DomainParticipant()
gen_info = ddsutil.get_dds_classes_from_idl('example1.idl', 'basic::module_SequenceOfStruct::SequenceOfStruct_struct')
topic = gen_info.register_topic(dp, 'Example1')

我在 source code.

中也看不到相关功能

如果这是一个简单的问题或者我遗漏了什么,我深表歉意 - 我是 Vortex OpenSplice DDS 的新手。

如有任何帮助,我们将不胜感激。

我无法与 OpenSplice 对话,但您可以使用 CoreDX DDS 做到这一点。例如,给定 IDL 文件“hello.idl”:

struct StringMsg
{
   string msg;
};

运行

coredx_ddl -l python -f hello.idl -d hello

然后下面的python是如何使用生成的'StringMsg'类型构造一个Topic和一个DataReader的例子:

import time
import dds.domain
from  hello import StringMsg

# Use default QoS, and no listener
dp    = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
sub   = dds.sub.Subscriber( dp )
dr    = dds.sub.DataReader( sub, topic )
rc    = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
                                 dds.core.ViewState.ANY_VIEW_STATE,
                                 dds.core.InstanceState.ANY_INSTANCE_STATE )
ws    = dds.cond.WaitSet()
ws.attach_condition(rc)

while True:
    t = dds.core.Duration.infinite()
    print('waiting for data...')
    ws.wait(t)
    while True:
        try:
            samples = dr.take( )
            for s in samples:
                if s.info.valid_data:
                    print("received: {}".format(s.data.msg))
        except dds.core.Error:
            break;