如何在python中使用"multi-threading"与ROS服务和串口通信?
How to use "multi-threading" in python with ROS services and serial comunication?
我想 运行 在我的机器人中使用 ROS 服务和串行通信(在 Raspberry Pi 和 ESP32 微控制器之间)在 python 中进行多个处理。机器人中的软件原理图如图
我开始用 C++ 做,从这个 link 可以看出,但我意识到我的串行通信模块和 ODOM(压力,IMU)读取在 python .
所以,程序的结构应该是下面这样的。首先我打开串口(Serial communication of Raspberry PI 4)。当串行打开时两个进程 运行ning
第一个,主要的 运行 自动执行以下操作:线程请求 ODOM 更新(来自微控制器的压力和 IMU)并发布它们。此外,每 0.3 秒检查一次调制解调器收件箱,如果有新内容就会发布。
另一个仅根据 ROS 服务的要求检测到调制解调器收件箱中有新消息执行暂停(在第一个主进程上)并在串行端口上执行(发布)。然后第一个进程恢复正常工作
所以我首先尝试做一些看起来像这些的伪 python 代码但是我需要帮助,因为我是 python 和多线程的新手。在这里
#!/usr/bin/env python3
from sys import ps1
import rospy
import numpy as np
from os import system
import time
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Pressure_Functions as Pressure
from std_msgs.msg import Float32
mutex = threading.Lock()
from std_msgs.msg import String
Communication_Mode_ = 0 # Serial Communication
pub_pressure = rospy.Publisher('depth',Float32,queue_size=1)
P0 = 1.01325 #Default Pressure
def callback(data):
global P0
mutex.acquire()
while (Serial.Serial_Port_Standard()): # While serial Port is open
try:
data_received_pressure = Pressure.Pressure_Get_Final_Values(1,1)
data_received_imu = IMU.IMU_Get_Values(1, 1)
P1 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
P0 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
P = P1 - P0 # Relative Measured Pressure
pressure = P
pub_pressure.publish(pressure)
except:
print ("pressure not obtained")
Modem.Send_Data(Communication_Mode, receiver_ID, data_size, data_to_send)
def listener():
# Not sure how to check every 0.3 seconds the modem inbox and if something new it publish. And also the DEMAND from ROS Services detect that there is new message in the modem inbox do HALT( on the first main Process) and execute (publish) on the serial Port. Then the first process resume with normal work
对于调制解调器检查,我虽然需要这样的东西,但我不确定。
def callback_modem(data_in):
data_in = Serial.Serial_Port_Receive_Data(20,0.2)
if (data_in[0] == 91) # Received data from acoustic modem
rt = RepeatedTimer(1, data_in, "Recieved Data") # it auto-starts, no need of rt.start()
modem_data= data_in
pub_modem.publish(modem_data)
try:
sleep(0.3) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
ROS服务回调的第三个线程应该是这样的?
def handle_ros_service(req):
data_received_temperature = TEMPERATURE.TEMPERATURE_Get_Values(1, 1)
Tempetrature = (np.int16((data_received_imu[6]<<24) | (data_received_imu[7]<<16) | (data_received_imu[8]<<8) | (data_received_imu[9])))/10000
mutex.acquire(blocking=True)
return handle_ros_service(req.Tempetrature)
def publish_on_serial():
# send temperature over serial port
mutex.release()
您可以使用两个回调构建一个默认的简单节点,其中一个是您订阅的 ROS 主题的消息回调,另一个是对 TimerEvent 的回调,可以重复调用0.3 秒。您需要在 rospy.spin()
之前启动 TimerEvent。
代码可能如下所示:
#!/usr/bin/env python
import threading
import rospy
from std_msgs.msg import String
mutex = threading.Lock()
def msg_callback(msg):
# reentrang processing
mutex.acquire(blocking=True)
# work serial port here, e.g. send msg to serial port
mutex.release()
# reentrant processing
def timer_callback(event):
# reentrant processing
mutex.acquire(blocking=True)
# work serial port here, e.g. check for incoming data
mutex.release()
# reentrant processing, e.g. publish the data from serial port
def service_callback(req):
# read out temperature
mutex.acquire(blocking=True)
# send temperature over serial port
mutex.release()
if __name__ == '__main__':
# initialize serial port here
rospy.init_node('name')
rospy.Subscriber('/test_in', String, msg_callback)
rospy.Service('on_req', Empty, service_callback)
rospy.Timer(rospy.Duration(0.3), timer_callback)
rospy.spin()
事实上,rospy 为回调启动了两个不同的线程。但是,由于 GIL,这些线程在不同的内核上并不是 运行 并行的,但是它们会在某些系统调用时被调度并切换执行,例如。 g., 当调用 IO 操作时。因此,您仍然需要像您在问题中所做的那样将您的线程与 Lock 同步。
我想 运行 在我的机器人中使用 ROS 服务和串行通信(在 Raspberry Pi 和 ESP32 微控制器之间)在 python 中进行多个处理。机器人中的软件原理图如图
我开始用 C++ 做,从这个 link
所以,程序的结构应该是下面这样的。首先我打开串口(Serial communication of Raspberry PI 4)。当串行打开时两个进程 运行ning
第一个,主要的 运行 自动执行以下操作:线程请求 ODOM 更新(来自微控制器的压力和 IMU)并发布它们。此外,每 0.3 秒检查一次调制解调器收件箱,如果有新内容就会发布。
另一个仅根据 ROS 服务的要求检测到调制解调器收件箱中有新消息执行暂停(在第一个主进程上)并在串行端口上执行(发布)。然后第一个进程恢复正常工作
所以我首先尝试做一些看起来像这些的伪 python 代码但是我需要帮助,因为我是 python 和多线程的新手。在这里
#!/usr/bin/env python3
from sys import ps1
import rospy
import numpy as np
from os import system
import time
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Pressure_Functions as Pressure
from std_msgs.msg import Float32
mutex = threading.Lock()
from std_msgs.msg import String
Communication_Mode_ = 0 # Serial Communication
pub_pressure = rospy.Publisher('depth',Float32,queue_size=1)
P0 = 1.01325 #Default Pressure
def callback(data):
global P0
mutex.acquire()
while (Serial.Serial_Port_Standard()): # While serial Port is open
try:
data_received_pressure = Pressure.Pressure_Get_Final_Values(1,1)
data_received_imu = IMU.IMU_Get_Values(1, 1)
P1 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
P0 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
P = P1 - P0 # Relative Measured Pressure
pressure = P
pub_pressure.publish(pressure)
except:
print ("pressure not obtained")
Modem.Send_Data(Communication_Mode, receiver_ID, data_size, data_to_send)
def listener():
# Not sure how to check every 0.3 seconds the modem inbox and if something new it publish. And also the DEMAND from ROS Services detect that there is new message in the modem inbox do HALT( on the first main Process) and execute (publish) on the serial Port. Then the first process resume with normal work
对于调制解调器检查,我虽然需要这样的东西,但我不确定。
def callback_modem(data_in):
data_in = Serial.Serial_Port_Receive_Data(20,0.2)
if (data_in[0] == 91) # Received data from acoustic modem
rt = RepeatedTimer(1, data_in, "Recieved Data") # it auto-starts, no need of rt.start()
modem_data= data_in
pub_modem.publish(modem_data)
try:
sleep(0.3) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
ROS服务回调的第三个线程应该是这样的?
def handle_ros_service(req):
data_received_temperature = TEMPERATURE.TEMPERATURE_Get_Values(1, 1)
Tempetrature = (np.int16((data_received_imu[6]<<24) | (data_received_imu[7]<<16) | (data_received_imu[8]<<8) | (data_received_imu[9])))/10000
mutex.acquire(blocking=True)
return handle_ros_service(req.Tempetrature)
def publish_on_serial():
# send temperature over serial port
mutex.release()
您可以使用两个回调构建一个默认的简单节点,其中一个是您订阅的 ROS 主题的消息回调,另一个是对 TimerEvent 的回调,可以重复调用0.3 秒。您需要在 rospy.spin()
之前启动 TimerEvent。
代码可能如下所示:
#!/usr/bin/env python
import threading
import rospy
from std_msgs.msg import String
mutex = threading.Lock()
def msg_callback(msg):
# reentrang processing
mutex.acquire(blocking=True)
# work serial port here, e.g. send msg to serial port
mutex.release()
# reentrant processing
def timer_callback(event):
# reentrant processing
mutex.acquire(blocking=True)
# work serial port here, e.g. check for incoming data
mutex.release()
# reentrant processing, e.g. publish the data from serial port
def service_callback(req):
# read out temperature
mutex.acquire(blocking=True)
# send temperature over serial port
mutex.release()
if __name__ == '__main__':
# initialize serial port here
rospy.init_node('name')
rospy.Subscriber('/test_in', String, msg_callback)
rospy.Service('on_req', Empty, service_callback)
rospy.Timer(rospy.Duration(0.3), timer_callback)
rospy.spin()
事实上,rospy 为回调启动了两个不同的线程。但是,由于 GIL,这些线程在不同的内核上并不是 运行 并行的,但是它们会在某些系统调用时被调度并切换执行,例如。 g., 当调用 IO 操作时。因此,您仍然需要像您在问题中所做的那样将您的线程与 Lock 同步。