DDS Openslice:如何在 python 中创建监听器
DDS Openslice: How to create a listener in python
我正在使用 ADLINK 的 OpenSplice 及其 Python API。我似乎找不到好的文档或 class 参考。我想设置一种非阻塞方式来接收多条消息。他们的 Listener 似乎提供了这个,但不清楚如何在 python.
中设置它
他们的DDS教程给出了一个C例子:
class TempSensorListener :
public dds::sub::NoOpDataReaderListener<tutorial::TempSensorType>
{
public:
virtual void on_data_available(dds::sub::DataReader<tutorial::TempSensorType>& dr)
{
...
});
}
};
TempSensorListener listener;
dr.listener(&listener, dds::core::status::StatusMask::data_available());
这似乎表明数据接收器有一个“监听器”方法,用于为数据分配一个监听器reader。
如何使用 Python API 完成此操作?我似乎无法从 python 数据接收器中找到侦听器方法。
所提供的 Python 示例(example1.py 和 example2.py)提供
# Data available listener
class DataAvailableListener(Listener):
def __init__(self):
Listener.__init__(self)
def on_data_available(self, entity):
print('on_data_available called')
l = entity.read(10)
for (sd, si) in l:
sd.print_vars()
但我没有看到 class 的实例化。该示例似乎使用了 Waitset 而根本没有使用侦听器
我希望是这样的:
listener = DataAvailableListener()
reader.listener(listener)
reader 确实有一个名为 listener 的属性。我将对象分配给该属性,但它似乎没有任何效果。
我如何找到与 6.10.4 相同问题的解决方案:
参考 dds
包中的文档(包含在您的 $OSPL_HOME/tools/python/docs/html/dds.html
中),您可以使用 Subscriber
[=20] 的 create_datareader
方法设置监听器=]:
from dds import *
from foo import foo_type # idlpp generated module/class
# Data available listener
class DataAvailableListener(Listener):
def __init__(self):
Listener.__init__(self)
def on_data_available(self, entity):
print('on_data_available called')
l = entity.read(10)
for (sd, si) in l:
sd.print_vars()
dp = DomainParticipant()
topic = dp.create_topic('foo_topic',foo_type)
sub = dp.create_subscriber()
sub.create_datareader(topic,listener=DataAvailableListener())
我正在使用 ADLINK 的 OpenSplice 及其 Python API。我似乎找不到好的文档或 class 参考。我想设置一种非阻塞方式来接收多条消息。他们的 Listener 似乎提供了这个,但不清楚如何在 python.
中设置它他们的DDS教程给出了一个C例子:
class TempSensorListener :
public dds::sub::NoOpDataReaderListener<tutorial::TempSensorType>
{
public:
virtual void on_data_available(dds::sub::DataReader<tutorial::TempSensorType>& dr)
{
...
});
}
};
TempSensorListener listener;
dr.listener(&listener, dds::core::status::StatusMask::data_available());
这似乎表明数据接收器有一个“监听器”方法,用于为数据分配一个监听器reader。
如何使用 Python API 完成此操作?我似乎无法从 python 数据接收器中找到侦听器方法。
所提供的 Python 示例(example1.py 和 example2.py)提供
# Data available listener
class DataAvailableListener(Listener):
def __init__(self):
Listener.__init__(self)
def on_data_available(self, entity):
print('on_data_available called')
l = entity.read(10)
for (sd, si) in l:
sd.print_vars()
但我没有看到 class 的实例化。该示例似乎使用了 Waitset 而根本没有使用侦听器
我希望是这样的:
listener = DataAvailableListener()
reader.listener(listener)
reader 确实有一个名为 listener 的属性。我将对象分配给该属性,但它似乎没有任何效果。
我如何找到与 6.10.4 相同问题的解决方案:
参考 dds
包中的文档(包含在您的 $OSPL_HOME/tools/python/docs/html/dds.html
中),您可以使用 Subscriber
[=20] 的 create_datareader
方法设置监听器=]:
from dds import * from foo import foo_type # idlpp generated module/class # Data available listener class DataAvailableListener(Listener): def __init__(self): Listener.__init__(self) def on_data_available(self, entity): print('on_data_available called') l = entity.read(10) for (sd, si) in l: sd.print_vars() dp = DomainParticipant() topic = dp.create_topic('foo_topic',foo_type) sub = dp.create_subscriber() sub.create_datareader(topic,listener=DataAvailableListener())