如何将接收器连接到 REDHAWK 中的外部波形端口?
How to connect a sink to a external waveform port in REDHAWK?
我正在尝试为 REDHAWK 波形编写单元测试。我想使用流源来输入数据,并使用 stream/message 接收器来存储输出。我已经用这种方式为组件编写了单元测试,但也想为波形创建一个测试。我找到了将 StreamSource 连接到波形端口的解决方案,但无法确定如何将接收器连接到波形端口。
对于源和组件(其中self.comp
是组件),通常可以使用以下方式连接它们:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=self.comp,
providesPortName='dataShort_in',
connectionId='testConn')
对于源和波形(其中 self.app
是波形),我能够使以下工作正常进行:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=CorbaObject(self.app.getPort('dataShort_in')),
connectionId='testConn')
但是,对于接收器,我通常会在组件上调用 connect
:
sink = StreamSink('short')
self.comp.connect(sink, usesPortName='dataShort_out')
我尝试通过从波形中获取端口来使用与源案例类似的方法,如下所示:
sink = StreamSink('short')
self.app.getPort('dataShort_out').connectPort(sink, 'outputConn')
然而,这给出了错误:
File "/usr/local/redhawk/core/lib/python/ossie/cf/Port_idl.py", line 86, in connectPort
return self._obj.invoke("connectPort", _0_CF.Port._d_connectPort, args)
BAD_PARAM: CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType, CORBA.COMPLETED_NO, ["Expecting object reference, got <class 'bulkio.sandbox.streamsink.StreamSink'>", "Operation 'connectPort' parameter 0"])
我不确定如何获得一个 CORBA obj ref 供此处使用的接收器使用。或者我可以在这里使用另一种方法将端口连接到接收器吗?
我在 Centos 7 上使用 REDHAWK 2.2.2。
我想我已经找到了解决我自己问题的方法。我最终创建了一个新的 class 来管理适用于接收器和源的端口连接。我称它为 ConnectionManager
(希望它不会与 ossie.utils.model.connection.ConnectionManager
class.
混淆
class ConnectionManager:
def __init__(self):
self.connections = list()
def clear(self):
del self.connections[:]
def connect(self, usesPort, providesPort, id):
usesPort.connectPort(providesPort, id)
self.connections.append( (usesPort, id))
def disconnectAll(self):
for port, id in self.connections:
port.disconnectPort(id)
self.clear()
这是一个使用 StreamSource
的示例(self.cm
是 ConnectionManager
):
strm = sb.StreamSource(streamID='strm1', format='short')
self.cm.connect(strm.getPort('shortOut'),
self.app.getPort('dataShort_in'),
'connID')
以及使用 StreamSink
的示例:
sink = sb.StreamSink('short')
self.cm.connect(self.app.getPort('dataShort_out'),
sink.getPort('shortIn'),
'conn2ID')
我的单元测试 setUp
方法调用 self.cm.clear()
,tearDown
方法调用 self.cm.disconnectAll()
以在每次测试后清理连接。
我唯一不明白的是接收器和源端口的名称 classes。使用 {format}{In|Out}
名称有效,但我不知道为什么。
您申请将组件连接到接收器的相同过程适用于应用程序,只要该应用程序是沙箱对象而不是 CORBA 对象:
dom = redhawk.attach()
app = dom.apps[0]
sink = sb.StreamSink('short')
app.connect(sink)
下一个代码显示端口的名称。在这种情况下,只有一种短类型。
from pprint import pprint
pprint(sink._providesPortDict)
下面的代码显示了使用 CORBA 引用而不是沙箱对象的语法。
sink_port = sink.getPort('shortIn')
ref = app.ref
ref.getPort('dataShort_out').connectPort(sink_port, 'outputConn')
您可以 运行 沙盒中的波形。请注意,波形的组件需要 运行 在本地主机上。
使用 nodeBooter
shell 命令或 redhawk
Python 包中的 kickDomain
启动域管理器和设备管理器。
沙盒中 运行 波形的示例代码:
import os
from ossie.utils import redhawk, sb
dom = redhawk.attach()
SDRROOT = os.getenv('SDRROOT')
waveform_dir = os.path.join(SDRROOT, 'dom', 'waveforms')
waveform_name = os.listdir(waveform_dir)[0]
app = dom.createApplication(waveform_name)
sink = sb.StreamSink()
app.connect(sink)
我正在尝试为 REDHAWK 波形编写单元测试。我想使用流源来输入数据,并使用 stream/message 接收器来存储输出。我已经用这种方式为组件编写了单元测试,但也想为波形创建一个测试。我找到了将 StreamSource 连接到波形端口的解决方案,但无法确定如何将接收器连接到波形端口。
对于源和组件(其中self.comp
是组件),通常可以使用以下方式连接它们:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=self.comp,
providesPortName='dataShort_in',
connectionId='testConn')
对于源和波形(其中 self.app
是波形),我能够使以下工作正常进行:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=CorbaObject(self.app.getPort('dataShort_in')),
connectionId='testConn')
但是,对于接收器,我通常会在组件上调用 connect
:
sink = StreamSink('short')
self.comp.connect(sink, usesPortName='dataShort_out')
我尝试通过从波形中获取端口来使用与源案例类似的方法,如下所示:
sink = StreamSink('short')
self.app.getPort('dataShort_out').connectPort(sink, 'outputConn')
然而,这给出了错误:
File "/usr/local/redhawk/core/lib/python/ossie/cf/Port_idl.py", line 86, in connectPort
return self._obj.invoke("connectPort", _0_CF.Port._d_connectPort, args)
BAD_PARAM: CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType, CORBA.COMPLETED_NO, ["Expecting object reference, got <class 'bulkio.sandbox.streamsink.StreamSink'>", "Operation 'connectPort' parameter 0"])
我不确定如何获得一个 CORBA obj ref 供此处使用的接收器使用。或者我可以在这里使用另一种方法将端口连接到接收器吗?
我在 Centos 7 上使用 REDHAWK 2.2.2。
我想我已经找到了解决我自己问题的方法。我最终创建了一个新的 class 来管理适用于接收器和源的端口连接。我称它为 ConnectionManager
(希望它不会与 ossie.utils.model.connection.ConnectionManager
class.
class ConnectionManager:
def __init__(self):
self.connections = list()
def clear(self):
del self.connections[:]
def connect(self, usesPort, providesPort, id):
usesPort.connectPort(providesPort, id)
self.connections.append( (usesPort, id))
def disconnectAll(self):
for port, id in self.connections:
port.disconnectPort(id)
self.clear()
这是一个使用 StreamSource
的示例(self.cm
是 ConnectionManager
):
strm = sb.StreamSource(streamID='strm1', format='short')
self.cm.connect(strm.getPort('shortOut'),
self.app.getPort('dataShort_in'),
'connID')
以及使用 StreamSink
的示例:
sink = sb.StreamSink('short')
self.cm.connect(self.app.getPort('dataShort_out'),
sink.getPort('shortIn'),
'conn2ID')
我的单元测试 setUp
方法调用 self.cm.clear()
,tearDown
方法调用 self.cm.disconnectAll()
以在每次测试后清理连接。
我唯一不明白的是接收器和源端口的名称 classes。使用 {format}{In|Out}
名称有效,但我不知道为什么。
您申请将组件连接到接收器的相同过程适用于应用程序,只要该应用程序是沙箱对象而不是 CORBA 对象:
dom = redhawk.attach()
app = dom.apps[0]
sink = sb.StreamSink('short')
app.connect(sink)
下一个代码显示端口的名称。在这种情况下,只有一种短类型。
from pprint import pprint
pprint(sink._providesPortDict)
下面的代码显示了使用 CORBA 引用而不是沙箱对象的语法。
sink_port = sink.getPort('shortIn')
ref = app.ref
ref.getPort('dataShort_out').connectPort(sink_port, 'outputConn')
您可以 运行 沙盒中的波形。请注意,波形的组件需要 运行 在本地主机上。
使用 nodeBooter
shell 命令或 redhawk
Python 包中的 kickDomain
启动域管理器和设备管理器。
沙盒中 运行 波形的示例代码:
import os
from ossie.utils import redhawk, sb
dom = redhawk.attach()
SDRROOT = os.getenv('SDRROOT')
waveform_dir = os.path.join(SDRROOT, 'dom', 'waveforms')
waveform_name = os.listdir(waveform_dir)[0]
app = dom.createApplication(waveform_name)
sink = sb.StreamSink()
app.connect(sink)