使用 python 订阅 ROS sensor_msg/Image
Subscribing to a ROS sensor_msg/Image using python
我正在尝试订阅由 vrep 视觉传感器发布的 ROS 节点。这是我的代码,在使用我的内置网络摄像头时可以正常工作:
import rospy
from sensor_msgs.msg import Image
from std_msgs.msg import String
from cv_bridge import CvBridge, CvBridgeError
import cv2
import numpy as np
import tensorflow as tf
import classify_image
class RosTensorFlow():
def __init__(self):
classify_image.maybe_download_and_extract()
self._session = tf.Session()
classify_image.create_graph()
self._cv_bridge = CvBridge()
self._sub = rospy.Subscriber('image', Image, self.callback, queue_size=1)
self._pub = rospy.Publisher('result', String, queue_size=1)
self.score_threshold = rospy.get_param('~score_threshold', 0.1)
self.use_top_k = rospy.get_param('~use_top_k', 5)
def callback(self, image_msg):
try:
cv_image = self._cv_bridge.imgmsg_to_cv2(image_msg, desired_encoding="passthrough")
except CvBridgeError as e:
print(e)
# copy from
# classify_image.py
image_data = cv2.imencode('.jpg', cv_image)[1].tostring()
# Creates graph from saved GraphDef.
softmax_tensor = self._session.graph.get_tensor_by_name('softmax:0')
predictions = self._session.run(
softmax_tensor, {'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = classify_image.NodeLookup()
top_k = predictions.argsort()[-self.use_top_k:][::-1]
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
if score > self.score_threshold:
rospy.loginfo('%s (score = %.5f)' % (human_string, score))
self._pub.publish(human_string)
def main(self):
rospy.spin()
if __name__ == '__main__':
classify_image.setup_args()
rospy.init_node('rostensorflow')
tensor = RosTensorFlow()
tensor.main()
和我在 vrep 中的非线程子脚本,我基本上是从 rosInterfaceTopicPublisherAndSubscriber.ttt 教程中复制的:
function sysCall_init()
activeVisionSensor=sim.getObjectHandle('Vision_sensor')
pub=simROS.advertise('image', 'sensor_msgs/Image')
simROS.publisherTreatUInt8ArrayAsString(pub) -- treat uint8 arrays as strings (much faster, tables/arrays are kind of slow in Lua)
end
function sysCall_sensing()
-- Publish the image of the active vision sensor:
local data,w,h=sim.getVisionSensorCharImage(activeVisionSensor)
d={}
d['header']={seq=0,stamp=simROS.getTime(), frame_id="a"}
d['height']=h
d['width']=w
d['encoding']='rgb8'
d['is_bigendian']=1
d['step']=w*3
d['data']=data
simROS.publish(pub,d)
print("published")
end
function sysCall_cleanup()
simROS.shutdownPublisher(pub)
end
我是 运行 脚本:
python image_recognition.py
我没有收到任何错误消息,但就是没有输出。你能给我一个提示,我需要改变什么吗?我做了很多研究,但到目前为止没有任何成功。
成功了。
对于感兴趣的人,我是 运行:
rostopic list -v
这告诉我该节点实际上发布为:
/vrep_ros_interface/image
我刚刚在
中更改了它
rospy.Subscriber
并且脚本有效:)
我正在尝试订阅由 vrep 视觉传感器发布的 ROS 节点。这是我的代码,在使用我的内置网络摄像头时可以正常工作:
import rospy
from sensor_msgs.msg import Image
from std_msgs.msg import String
from cv_bridge import CvBridge, CvBridgeError
import cv2
import numpy as np
import tensorflow as tf
import classify_image
class RosTensorFlow():
def __init__(self):
classify_image.maybe_download_and_extract()
self._session = tf.Session()
classify_image.create_graph()
self._cv_bridge = CvBridge()
self._sub = rospy.Subscriber('image', Image, self.callback, queue_size=1)
self._pub = rospy.Publisher('result', String, queue_size=1)
self.score_threshold = rospy.get_param('~score_threshold', 0.1)
self.use_top_k = rospy.get_param('~use_top_k', 5)
def callback(self, image_msg):
try:
cv_image = self._cv_bridge.imgmsg_to_cv2(image_msg, desired_encoding="passthrough")
except CvBridgeError as e:
print(e)
# copy from
# classify_image.py
image_data = cv2.imencode('.jpg', cv_image)[1].tostring()
# Creates graph from saved GraphDef.
softmax_tensor = self._session.graph.get_tensor_by_name('softmax:0')
predictions = self._session.run(
softmax_tensor, {'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = classify_image.NodeLookup()
top_k = predictions.argsort()[-self.use_top_k:][::-1]
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
if score > self.score_threshold:
rospy.loginfo('%s (score = %.5f)' % (human_string, score))
self._pub.publish(human_string)
def main(self):
rospy.spin()
if __name__ == '__main__':
classify_image.setup_args()
rospy.init_node('rostensorflow')
tensor = RosTensorFlow()
tensor.main()
和我在 vrep 中的非线程子脚本,我基本上是从 rosInterfaceTopicPublisherAndSubscriber.ttt 教程中复制的:
function sysCall_init()
activeVisionSensor=sim.getObjectHandle('Vision_sensor')
pub=simROS.advertise('image', 'sensor_msgs/Image')
simROS.publisherTreatUInt8ArrayAsString(pub) -- treat uint8 arrays as strings (much faster, tables/arrays are kind of slow in Lua)
end
function sysCall_sensing()
-- Publish the image of the active vision sensor:
local data,w,h=sim.getVisionSensorCharImage(activeVisionSensor)
d={}
d['header']={seq=0,stamp=simROS.getTime(), frame_id="a"}
d['height']=h
d['width']=w
d['encoding']='rgb8'
d['is_bigendian']=1
d['step']=w*3
d['data']=data
simROS.publish(pub,d)
print("published")
end
function sysCall_cleanup()
simROS.shutdownPublisher(pub)
end
我是 运行 脚本:
python image_recognition.py
我没有收到任何错误消息,但就是没有输出。你能给我一个提示,我需要改变什么吗?我做了很多研究,但到目前为止没有任何成功。
成功了。 对于感兴趣的人,我是 运行:
rostopic list -v
这告诉我该节点实际上发布为:
/vrep_ros_interface/image
我刚刚在
中更改了它rospy.Subscriber
并且脚本有效:)