Как получить конечное количество образцов в будущем, используя UHD/GNURadio?
Я использую интерфейс Python GNURadio для UHD и пытаюсь установить определенное время, чтобы начать сбор образцов и либо собирать определенное количество образцов, либо остановить сбор образцов в определенное время. По сути, создание синхронизированного снимка образцов. Это что-то похожее на пример C++ Ettus UHD 'rx_timed_sample'.
Я могу заставить потоковую диаграмму запускаться в определенное время, но я не могу заставить ее остановиться в определенное время (по крайней мере, не вызывая переполнения). Я также пытался сделать конечное приобретение, которое работает, но я не могу заставить его начать в определенное время. Так что я немного растерялся, что делать дальше.
Вот моя попытка конечного захвата (кажется, просто игнорирует время запуска и собирает 0 образцов):
num_samples = 1000
usrp = uhd.usrp_source(
",".join(("", "")),
uhd.stream_args(
cpu_format="fc32",
channels=range(1),
),
)
...
usrp.set_start_time(absolute_start_time)
samples = usrp.finite_acquisition(num_samples)
Я также попробовал некоторые комбинации следующих безуспешно (TypeError: в методе 'usrp_source_sptr_issue_stream_cmd', аргумент 2 типа '::uhd::stream_cmd_t const &'):
usrp.set_command_time(absolute_start_time)
usrp.issue_stream_cmd(uhd.stream_cmd.STREAM_MODE_NUM_SAMPS_AND_DONE)
Я также попробовал следующее в блок-схеме:
...
usrp = flowgrah.uhd_usrp_source_0
absolute_start_time = uhd.uhd_swig.time_spec_t(start_time)
usrp.set_start_time(absolute_start_time)
flowgrah.start()
stop_cmd = uhd.stream_cmd(uhd.stream_cmd.STREAM_MODE_STOP_CONTINUOUS)
absolute_stop_time = absolute_start_time + uhd.uhd_swig.time_spec_t(collection_time)
usrp.set_command_time(absolute_stop_time)
usrp.issue_stream_cmd(stop_cmd)
По какой-то причине генерируемый потоковый граф последовательно переполняется для чего-либо большего, чем время сбора.02s.
1 ответ
Я столкнулся с подобной проблемой и решил ее с помощью head
блок.
Вот простой пример, который сохраняет 10000 сэмплов из источника синусоидальной волны и затем завершается.
#!/usr/bin/env python
# Evan Widloski - 2017-09-03
# Logging test in gnuradio
from gnuradio import gr
from gnuradio import blocks
from gnuradio import analog
class top_block(gr.top_block):
def __init__(self, output):
gr.top_block.__init__(self)
sample_rate = 32e3
num_samples = 10000
ampl = 1
source = analog.sig_source_f(sample_rate, analog.GR_SIN_WAVE, 100, ampl)
head = blocks.head(4, num_samples)
sink = blocks.file_sink(4, output)
self.connect(source, head)
self.connect(head, sink)
if __name__ == '__main__':
try:
top_block('/tmp/out').run()
except KeyboardInterrupt:
pass