python和ROS Services如何在多线程中实现同步?

How to implement synchronization in multi threading with python and ROS Services?

我正在尝试使用 python 并使用互斥线程实现多线程(并行处理)。我有第一个进程检查压力值和调制解调器更新(在用 odom_callbackcallback_modem 函数实现的代码中),第二个进程调用 ROS SERVICES(在用 [=17= 实现的代码中) ] 服务器和 imu_client 客户端函数)。下面是python

中的实现代码
#!/usr/bin/env python3

from __future__ import print_function
import rospy
import numpy as np
from os import system
import time
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import threading 
import time
import serial
import serial.tools.list_ports
from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import ImuValue,ImuValueResponse

P0 = 1.01325 #Default Pressure 
mutex = threading.Lock()
Communication_Mode_ = 0
pub_pressure = rospy.Publisher('depth',Vector3,queue_size=1)
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def handle_ros_services(req):
    mutex.acquire(blocking=True)
    print("Server Read Data:")
    global T0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #print("Server Read Data:")
    T0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    T=T0
    temperature = T
    current_x_orientation_s = temperature
    print("Returning Service Temperature Data", current_x_orientation_s)
    return ImuValueResponse(current_x_orientation_s, True)
    mutex.release()

def ros_serice_server():
    s = rospy.Service('imu_value', ImuValue, handle_ros_services)
    print("Ready to get_value")

def odom_callback():
    # reentrang processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. send msg to serial port
    global P0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #P1 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P1 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    #P0 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    P = P0 # Relative Measured Pressure
    feedback =Vector3()
    feedback.x = 0    #Angular Velocity
    feedback.y = 0
    feedback.z = P/9.81 #Depth
    pressure = feedback.z
    print("Pressure : ", pressure)
    pub_pressure.publish(feedback)
    # reentrant processing
    mutex.release()
    
def callback_modem(event):
    # reentrant processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. check for incoming data
    event = Serial.Serial_Port_Receive_Data(20,0.2)
    if (event == 1): # Received data from acoustic modem
        modem_data= event
        pub_modem.publish(modem_data)
        print("received ")
    else:
        print("not received...... ")
    mutex.release()
 
if __name__ == '__main__':
    # initialize serial port here
    Serial.Serial_Port_Standard()
    rospy.init_node('imu_value')
    ros_serice_server()
    rospy.Timer(rospy.Duration(1), callback_modem) 
    while not rospy.is_shutdown():
        try:
            odom_callback()
        except:
            print('pass')

和客户端节点

#!/usr/bin/env python3

from __future__ import print_function
import rospy
import sys
import numpy as np
from os import system
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import time
import serial
import serial.tools.list_ports

from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import *

mutex = threading.Lock()
Communication_Mode_ = 0
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def imu_client():
    mutex.acquire(blocking=True)
    rospy.wait_for_service('imu_value')
    imu_value = rospy.ServiceProxy('imu_value', ImuValue)
    print("Request call send")
    resp1 = imu_value(0.05)
    return resp1.current_x_orientation_s
    mutex.release()

if __name__ == "__main__":
    rospy.init_node('client_node_f')
    while not rospy.is_shutdown():
        try:
            print("entering client")
            value = imu_client()
            print(value)
            time.sleep(1)
        except:
            print('pass')

所以输出如下。 ROS 服务服务器的第一个进程的输出是

Pressure :  0.10602446483180428
Server Read Data:
Returning Service Temperature Data 1.0401

然后在打电话给客户后我得到了

entering client
Request call send
1.0401
entering client

问题是在调用 ROS SERVICE 客户端节点后进程停止,因此不会继续第一个进程(压力值和调制解调器更新)。 ROS SERVICES 进程应该只按需调用并且应该暂停第一个进程(压力和调制解调器)然后应该恢复工作。那么,我需要为 ROS SERVICES 调用实现 SEMAPHORES 吗?如果是,它应该如何在代码中。所以我确实需要某种同步,对吗?请帮忙?

您的问题是:

def handle_ros_services(req):
    mutex.acquire(blocking=True)
    ...
    return ImuValueResponse(current_x_orientation_s, True)
    mutex.release()

由于 return 语句,从不执行发布。

你最后需要:

    value = ImValueResponse(...)
    mutex.release()
    return value

更好的方法是将互斥体用作 with 语句的一部分:

with mutex:
    do anything you want, knowing that the lock will be released
    at the end, even if you return or throw an exception.