nidaqmx:防止任务在功能改变后关闭
nidaqmx: prevent task from closing after being altered in function
我正在尝试编写一个 API 来利用 NI-DAQmx 的 python 包装器,并且需要有一个可以跨模块编辑的全局任务列表。
这是我目前尝试过的方法:
1) 创建了一个可导入的任务字典,每当调用 ni-daqmx 时都会更新该字典。函数端点处理来自 HTTPS 请求的数据,我保证它不仅仅是 ni-daqmx 库本身的无意义包装。
例如,在启动时,会创建以下内容:
#./daq/__init.py__
import nidaqmx
# ... other stuff ...#
TASKS = {}
然后,用户可以通过调用此端点来创建任务
#./daq/task/task.py
from daq import TASKS
# ...
def api_create_task_endpoint(task_id):
try:
task = nidaqmx.Task(new_task_name=task_id)
TASKS[task_id] = task
except Exception:
# handle it
到这里为止的一切都正常工作。我可以获得任务列表,并且任务保持打开状态。我也试过显式调用 task.control(nidaqmx.constants.TaskMode.TASK_RESERVE)
,但无论如何下面的代码都会给我同样的问题。
当我尝试向任务添加通道时,无论我如何设置状态,它都会在函数调用结束时关闭。
#./daq/task/channels.py
from daq import TASKS
def api_add_channel_task_endpoint(task_id, channel_type, function):
# channel_type corresponds to ni-daqmx channel modules (e.g. ai_channels).
# function corresponds to callable functions (e.g. add_ai_voltage_chan)
# do some preliminary checks (e.g. task exists, channel type valid)
channels = get_chans_from_json_post()
with TASKS[task_id] as task:
getattr(getattr(task, channel_type), function)(channels)
# e.g. task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
这显然是在关闭任务。当我再次调用 api_create_task_endpoint(task_id)
时,我收到 DaqResourceWarning
任务已关闭,不再存在。
我同样尝试在此处使用 task.control
设置任务模式,但无济于事。
我希望能够通过将任务存储在模块范围的 TASKS
字典中来对任务进行编辑,但无法让任务打开足够长的时间来执行此操作。
2) 我还尝试使用 NI-MAX 保存功能来实现它。这个问题是任务无法保存,除非它们已经包含频道,我不一定想在创建任务后立即执行。
我试图通过向 api_create_task_endpoint()
添加一些默认行为来解决此问题,该默认行为仅添加一个随机频道,该频道在用户添加的第一个频道上被删除。
问题是,我找不到任何文档来说明在没有 GUI 的情况下添加频道后从任务中删除频道的方法(这是 CENTOS 上的 运行,因此 GUI 是行不通的)。
非常感谢您的帮助!
我还没有使用 NI-DAQmx 的 Python 绑定,但是
with TASKS[task_id] as task:
看起来它会在更新后立即停止并清除任务,因为程序流离开 with
块并执行 Task.__exit__()。
因为您希望这些任务在使用 Python 模块时仍然存在,所以我的建议是 仅 在您需要更改时使用 task.control()
任务状态。
我正在尝试编写一个 API 来利用 NI-DAQmx 的 python 包装器,并且需要有一个可以跨模块编辑的全局任务列表。
这是我目前尝试过的方法:
1) 创建了一个可导入的任务字典,每当调用 ni-daqmx 时都会更新该字典。函数端点处理来自 HTTPS 请求的数据,我保证它不仅仅是 ni-daqmx 库本身的无意义包装。
例如,在启动时,会创建以下内容:
#./daq/__init.py__
import nidaqmx
# ... other stuff ...#
TASKS = {}
然后,用户可以通过调用此端点来创建任务
#./daq/task/task.py
from daq import TASKS
# ...
def api_create_task_endpoint(task_id):
try:
task = nidaqmx.Task(new_task_name=task_id)
TASKS[task_id] = task
except Exception:
# handle it
到这里为止的一切都正常工作。我可以获得任务列表,并且任务保持打开状态。我也试过显式调用 task.control(nidaqmx.constants.TaskMode.TASK_RESERVE)
,但无论如何下面的代码都会给我同样的问题。
当我尝试向任务添加通道时,无论我如何设置状态,它都会在函数调用结束时关闭。
#./daq/task/channels.py
from daq import TASKS
def api_add_channel_task_endpoint(task_id, channel_type, function):
# channel_type corresponds to ni-daqmx channel modules (e.g. ai_channels).
# function corresponds to callable functions (e.g. add_ai_voltage_chan)
# do some preliminary checks (e.g. task exists, channel type valid)
channels = get_chans_from_json_post()
with TASKS[task_id] as task:
getattr(getattr(task, channel_type), function)(channels)
# e.g. task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
这显然是在关闭任务。当我再次调用 api_create_task_endpoint(task_id)
时,我收到 DaqResourceWarning
任务已关闭,不再存在。
我同样尝试在此处使用 task.control
设置任务模式,但无济于事。
我希望能够通过将任务存储在模块范围的 TASKS
字典中来对任务进行编辑,但无法让任务打开足够长的时间来执行此操作。
2) 我还尝试使用 NI-MAX 保存功能来实现它。这个问题是任务无法保存,除非它们已经包含频道,我不一定想在创建任务后立即执行。
我试图通过向 api_create_task_endpoint()
添加一些默认行为来解决此问题,该默认行为仅添加一个随机频道,该频道在用户添加的第一个频道上被删除。
问题是,我找不到任何文档来说明在没有 GUI 的情况下添加频道后从任务中删除频道的方法(这是 CENTOS 上的 运行,因此 GUI 是行不通的)。
非常感谢您的帮助!
我还没有使用 NI-DAQmx 的 Python 绑定,但是
with TASKS[task_id] as task:
看起来它会在更新后立即停止并清除任务,因为程序流离开 with
块并执行 Task.__exit__()。
因为您希望这些任务在使用 Python 模块时仍然存在,所以我的建议是 仅 在您需要更改时使用 task.control()
任务状态。