如何在 gnuradio 中测试源代码块
How to test source blocks in gnuradio
我正在尝试在 gnuradio 中创建一个简单的源代码块。我已经使用 gr_modtool 来创建基本的模块和块,但是每当我尝试 运行 测试时,它很快就会耗尽我所有的内存并且我的计算机开始滞后。更糟糕的是,测试失败 "thread[thread-per-block[1]: ]: std::bad_alloc"
这是我的块(称为 csv):
import numpy
from gnuradio import gr
class csv(gr.sync_block):
"""
docstring for block csv
"""
def __init__(self, filename):
gr.sync_block.__init__(self, name="csv",
in_sig=None,
out_sig=[numpy.float32])
def work(self, input_items, output_items):
out = output_items[0]
out[:] = 0
return len(output_items[0])
这是我用来测试的代码:
from gnuradio import gr, gr_unittest
from gnuradio import blocks
from csv import csv
import time
class qa_csv (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
def tearDown (self):
self.tb = None
def test_001_t (self):
expected = [0.0]
# set up fg
c = csv(None)
sink = blocks.vector_sink_f()
self.tb.connect(c, sink)
self.tb.run()
# check data
results = sink.data()
self.assertFloatTuplesAlmostEqual(expected, results)
if __name__ == '__main__':
gr_unittest.run(qa_csv, "qa_csv.xml")
任何人都可以帮我找出我哪里错了或指出正确的方向吗?
几件事:
默认情况下,流程图将 运行 直到它被告知停止。所以这就是为什么你的记忆被吞噬了。 GNU Radio 正在抛出 std::bad_alloc
,因为你一直在 vector_sink
中填充东西,最终它 运行 超出 RAM。
您需要停止流程图。有几种方法可以做到这一点:
Return WORK_DONE
(-1) 来自块的 work
函数。这适用于当块有有限数量的数据要服务然后发出完成信号时。以 vector_source
为例。
使用head
block,它将复制N个样本,然后为您return WORK_DONE
。 这对单元测试很有用。
最后一点:一旦你开始工作,你的测试仍然会失败(除非你请求 head
只复制 1 个样本)因为你已经写好了,你的源代码块将填满它的整个输出每次调用时用零缓冲:
>>> import numpy as np
>>> out = np.empty(10, dtype=float)
>>> out[:] = 0
>>> out
array([ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
所以只要确保您的 expected
数组具有您从 head
请求的相同数量的样本:
>>> from gnuradio import gr, blocks
>>> n = 1000
>>> head = blocks.head(gr.sizeof_float, n)
>>> expected = np.zeros(n)
我正在尝试在 gnuradio 中创建一个简单的源代码块。我已经使用 gr_modtool 来创建基本的模块和块,但是每当我尝试 运行 测试时,它很快就会耗尽我所有的内存并且我的计算机开始滞后。更糟糕的是,测试失败 "thread[thread-per-block[1]: ]: std::bad_alloc"
这是我的块(称为 csv):
import numpy
from gnuradio import gr
class csv(gr.sync_block):
"""
docstring for block csv
"""
def __init__(self, filename):
gr.sync_block.__init__(self, name="csv",
in_sig=None,
out_sig=[numpy.float32])
def work(self, input_items, output_items):
out = output_items[0]
out[:] = 0
return len(output_items[0])
这是我用来测试的代码:
from gnuradio import gr, gr_unittest
from gnuradio import blocks
from csv import csv
import time
class qa_csv (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
def tearDown (self):
self.tb = None
def test_001_t (self):
expected = [0.0]
# set up fg
c = csv(None)
sink = blocks.vector_sink_f()
self.tb.connect(c, sink)
self.tb.run()
# check data
results = sink.data()
self.assertFloatTuplesAlmostEqual(expected, results)
if __name__ == '__main__':
gr_unittest.run(qa_csv, "qa_csv.xml")
任何人都可以帮我找出我哪里错了或指出正确的方向吗?
几件事:
默认情况下,流程图将 运行 直到它被告知停止。所以这就是为什么你的记忆被吞噬了。 GNU Radio 正在抛出 std::bad_alloc
,因为你一直在 vector_sink
中填充东西,最终它 运行 超出 RAM。
您需要停止流程图。有几种方法可以做到这一点:
Return
WORK_DONE
(-1) 来自块的work
函数。这适用于当块有有限数量的数据要服务然后发出完成信号时。以vector_source
为例。使用
head
block,它将复制N个样本,然后为您returnWORK_DONE
。 这对单元测试很有用。
最后一点:一旦你开始工作,你的测试仍然会失败(除非你请求 head
只复制 1 个样本)因为你已经写好了,你的源代码块将填满它的整个输出每次调用时用零缓冲:
>>> import numpy as np
>>> out = np.empty(10, dtype=float)
>>> out[:] = 0
>>> out
array([ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
所以只要确保您的 expected
数组具有您从 head
请求的相同数量的样本:
>>> from gnuradio import gr, blocks
>>> n = 1000
>>> head = blocks.head(gr.sizeof_float, n)
>>> expected = np.zeros(n)