功能

一、Gmapping建图

二、自主导航 起始点 、终点

三、人脸识别

四、语音控制

完成任务: 机器人先建图 建完图后给出目标点,机器人就可以完成调用自主导航走到目标点,期间会调用激光雷达扫描局部环境来进行自主避障,到达终点后进行语音播报和人脸识别

主要功能文件

按照工作目录来讲

一、Gmapping

就是开启运动服务器 然后通过语音控制或者键盘控制让机器人跑一遍地图,在跑的时候机器人会调用激光雷达进行环境扫描 ,绘制地图

二、自主导航

给定机器人初始路径点,结束路径点并存入文件,有起始位置,有终点位置,机器人就能使用move_base动作服务器将机器人导航到每个路径点,运行时出现障碍物 激光雷达进行环境扫描 绘制出局部地图 进行自主避障

auto_slam.py

#!/usr/bin/env python

import rospy

import actionlib

import roslaunch

from actionlib_msgs.msg import *

from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal

from nav_msgs.msg import Path

from std_msgs.msg import String

from geometry_msgs.msg import PoseWithCovarianceStamped

from tf_conversions import transformations

from xml.dom.minidom import parse

from math import pi

import tf

###定义对象

class navigation_demo:

def __init__(self):

self.set_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=5)

self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction)### 运动信息的节点(动作服务器)

self.move_base.wait_for_server(rospy.Duration(60))###用于等待与动作服务器的连接建立。

self.tf_listener = tf.TransformListener()### 监听阵列

self.get_point = rospy.Publisher('get_pos', String, queue_size=5)

self.plist = []

self.success_count = 0

def set_plist(self,plist):

self.plist = plist

## 初始化机器人姿态

def set_pose(self, p):

if self.move_base is None:

return False

x, y, th = p

pose = PoseWithCovarianceStamped()

pose.header.stamp = rospy.Time.now()

pose.header.frame_id = 'map'

pose.pose.pose.position.x = x

pose.pose.pose.position.y = y

q = transformations.quaternion_from_euler(0.0, 0.0, th/180.0*pi)

pose.pose.pose.orientation.x = q[0]

pose.pose.pose.orientation.y = q[1]

pose.pose.pose.orientation.z = q[2]

pose.pose.pose.orientation.w = q[3]

self.set_pose_pub.publish(pose)

return True

# 当导航行为完成时的回调函数

def _done_cb(self, status, result):

rospy.loginfo("navigation done! status:%d result:%s"%(status, result))

# 当导航行为激活时的回调函数

def _active_cb(self):

rospy.loginfo("[Navi] navigation has be actived")

# 导航过程中的反馈回调函数

def _feedback_cb(self, feedback):

rospy.loginfo("[Navi] navigation feedback\r\n%s"%feedback)

def goto(self, p):

goal = MoveBaseGoal()### 定义MoveBaseGoal对象 进行

goal.target_pose.header.frame_id = 'map' ###建立坐标’

### 设置goal的移动目标地点

goal.target_pose.header.stamp = rospy.Time.now()

goal.target_pose.pose.position.x = p[0]

goal.target_pose.pose.position.y = p[1]

goal.target_pose.pose.position.z = p[2]

#q = transformations.quaternion_from_euler(0.0, 0.0, p[2]/180.0*pi)###欧拉数转化为四元数,三维空间的旋转方向

goal.target_pose.pose.orientation.x = p[3]

goal.target_pose.pose.orientation.y = p[4]

goal.target_pose.pose.orientation.z = p[5]

goal.target_pose.pose.orientation.w = p[6]

### 发送导航目标,并指定回调函数

self.move_base.send_goal(goal, self._done_cb, self._active_cb, self._feedback_cb)

# 等待导航结果,超时时间为60秒

result = self.move_base.wait_for_result(rospy.Duration(60))### 是否到达这个导航点

print(result)

state = self.move_base.get_state()

if state == GoalStatus.SUCCEEDED:

self.success_count += 1

### 到达的导航点是否为最终目标点

if len(self.plist) == self.success_count:

rospy.loginfo("arrived goal point")

self.get_point.publish("1")

self.isSendVoice = False

return True

def cancel(self):

self.move_base.cancel_all_goals()

return True

###定义回调函数

def callback(msg):###调用回调函数 向订阅话题发消息 就会调用回调函数

doc = parse("/home/bcsh/waypoints.xml")### parse对象处理xml文档 Dom

root_element = doc.documentElement###文档根结点

points = root_element.getElementsByTagName("Waypoint")### 每个航点包含七个

plist = []

rospy.loginfo("set pose...")

navi = navigation_demo() ##创建一个navigation_demo对象

for p in points:

point = [0] * 7

point[0] = float(p.getElementsByTagName("Pos_x")[0].childNodes[0].data)

point[1] = float(p.getElementsByTagName("Pos_y")[0].childNodes[0].data)

point[2] = float(p.getElementsByTagName("Pos_z")[0].childNodes[0].data)

###三维空间旋转方向的四元数

point[3] = float(p.getElementsByTagName("Ori_x")[0].childNodes[0].data)

point[4] = float(p.getElementsByTagName("Ori_y")[0].childNodes[0].data)

point[5] = float(p.getElementsByTagName("Ori_z")[0].childNodes[0].data)

point[6] = float(p.getElementsByTagName("Ori_w")[0].childNodes[0].data)

plist.append(point)

print(plist)

rospy.loginfo("goto goal...")

navi.set_plist(plist)

for waypoint in plist:

#print(waypoint)

navi.goto(waypoint)

if __name__ == "__main__":

rospy.init_node('auto_slam_node',anonymous=True)#### 初始化ROS节点,命名'auto_slam_node'

rospy.Subscriber("auto_slam", String,callback)###订阅 "auto_slam" 话题并设置回调函数处理消息

rospy.spin()

r = rospy.Rate(0.2)# 创建一个rate对象以控制循环速率

r.sleep()

首先第一步完成建图

关于waypoints.xml

创建完图之后,用Rviz 插件 waterplus_map_tools 通过输入指令进行航点标注,

三、 人脸识别

Take_photo.py

照片存放位置

Face_Rec.py

1.Take_photo.py

拍照 存储 调用人脸识别

TakePhoto类继承了之前 ROS 与 Opencv 接口类,在这个类里面我们重写了 process_imag 函数,使得该函数可以完成人脸识别功能。核心函数为 detectMultiScale 函数,这个函数实现了将视频中的人脸提取出来,反馈值为 faces,faces 是由多个数组组成,每个数组代表人脸在当前图像中的位置(x,y,w,h)分别代表人脸框的左上角点的坐标,人脸框的宽度和长度。

#!/usr/bin/env python

import rospy

import cv2

from ros_opencv import ROS2OPENCV

import sys, select, os

# 定义一个类 TakePhoto,继承 ROS2OPENCV 类

class TakePhoto(ROS2OPENCV):

def __init__(self, node_name):

# 调用 ROS2OPENCV 类的构造函数

super(TakePhoto, self).__init__(node_name)

self.detect_box = None ##用于存储检测到的人脸的框的坐标信息。。

self.result = None ###存储处理后的图像,其中人脸被矩形框标记

self.count = 0 ##用于计数保存的人脸图像数量,初始化为 0,每次按下 'p' 键保存一张图像时递增。

self.person_name = rospy.get_param('~person_name', 'name_one')

self.face_cascade = cv2.CascadeClassifier('/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')###Haar 级联分类器 存放一组描述人脸特征的模型,用来识别人脸

self.dirname = "/home/bcsh/robot_ws/src/match_mini/scripts/p1/" + self.person_name + "/"

self.X = None

self.Y = None

# 定义图像处理函数

def process_image(self, frame):

# print("sss")

src = frame.copy()##复制输入的图像帧,以便在不修改原始数据的情况下进行处理。

gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)##将复制的图像帧转换为灰度图像,因为 Haar 级联分类器通常在灰度图像上执行人脸检测。

faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)###使用预训练的 Haar 级联分类器检测灰度图像中的人脸。detectMultiScale 返回一个包含检测到的人脸位置坐标的列表。

result = src.copy() ###以便在上面绘制矩形框。self.result 保存了处理后的图像。

self.result = result

#### 遍历检测到的人脸,并在图像上画矩形框

### 遍历检测到的人脸坐标,将每个人脸用蓝色矩形框标记在图像上。同时,如果按下 'p' 键并且 self.count 小于 20,将当前人脸图像保存到指定的目录,并递增 self.count。

for (x, y, w, h) in faces:

### 给人脸用矩阵框住 左上角,长度,宽度,颜色等参数

result = cv2.rectangle(result, (x, y), (x+w, y+h), (255, 0, 0), 2)

f = cv2.resize(gray[y:y+h, x:x+w], (200, 200))##对存储图片尺寸进行处理

if self.count<20:

# 如果按下 'p' 键,保存人脸图像

if key == 'p' :

cv2.imwrite(self.dirname + '%s.pgm' % str(self.count), f)

self.count += 1

return result

if __name__ == '__main__':

try:

# 初始化节点并运行

node_name = "take_photo_rec"

TakePhoto(node_name)

rospy.spin()

except KeyboardInterrupt:

print "Shutting down face detector node."

cv2.destroyAllWindows()

Face_Rec.py

#!/usr/bin/env python

# encoding: utf-8

import sys,os,cv2

import numpy as np

import rospy

from geometry_msgs.msg import Twist

from std_msgs.msg import String

pub = rospy.Publisher('cmd_vel', Twist, queue_size = 1)

speed = 0.3

turn = 1.0

face_path = "/home/bcsh/robot_ws/src/match_mini/scripts/data"

face_name = ""

def read_images(path, sz=None):

c = 0

X, y = [], []

names = []

for dirname, dirnames, filenames in os.walk(path):

for subdirname in dirnames:

subject_path = os.path.join(dirname, subdirname)

for filename in os.listdir(subject_path):

try:

if (filename == ".directory"):

continue

filepath = os.path.join(subject_path, filename)

im = cv2.imread(os.path.join(subject_path, filename), cv2.IMREAD_GRAYSCALE)

if (im is None):

print("image" + filepath + "is None")

if (sz is not None):

im = cv2.resize(im, sz)

X.append(np.asarray(im, dtype=np.uint8))

y.append(c)

except:

print("unexpected error")

raise

c = c + 1

names.append(subdirname)

###函数返回一个包含主题名称(names)、图像数据(X)和相应标签(y)的列表。

return [names, X, y]

def face_rec():

[names,X, y] = read_images(face_path)

y = np.asarray(y, dtype=np.int32)

#model = cv2.face_EigenFaceRecognizer.create()

### 创建训练模型

model = cv2.face.LBPHFaceRecognizer_create()

model.train(np.asarray(X), np.asarray(y))

face_cascade = cv2.CascadeClassifier(

'/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')

cap = cv2.VideoCapture(0)

###调用cv的图象识别

### 大筐筐 视图

cv2.namedWindow("face_detector",0) ##框框名字

cv2.resizeWindow("face_detector",480,320)## 框框大小

while True:

ret, frame = cap.read()## frame 传过来的一帧图片

### 对图片进行处理

x, y = frame.shape[0:2]

small_frame = cv2.resize(frame, (int(y / 2), int(x / 2)))

result = small_frame.copy()

gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)

faces = face_cascade.detectMultiScale(gray, 1.3, 5)##人脸在当前图像中的位置(x,y,w,h)

for (x, y, w, h) in faces:

### 小框框

result = cv2.rectangle(result, (x, y), (x + w, y + h), (255, 0, 0), 2)

roi = gray[y:y + h,x:x + w]

try:

roi = cv2.resize(roi, (200, 200), interpolation=cv2.INTER_LINEAR)

### 模型预测 对新图像 p_label,p_confidence进行预测

[p_label, p_confidence] = model.predict(roi)

cv2.putText(result, names[p_label], (x, y - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2)

print("p_confidence = " + str( ) +" name=" + names[p_label])

if p_confidence<60 and names[p_label] == face_name:

### 机器人停止位置有一段距离 所以因为距离误差,就得实际改变 置信度 因为要一直往一个方向走 所以p_confidence必须小于 所以只能实际情况确定来小于 机器人能识别置信度的最大值

offset_x = ((x+w) / 2 - 240)

target_area = w * h###摄像头看见人脸的目标区域

linear_vel = 0

angular_vel = 0

print(target_area)

## 到一定距离才能识别

if target_area<100:

linear_vel = 0.0

elif target_area >110:

linear_vel = 0.3

else:

linear_vel = 0.0

if offset_x > 0:

angular_vel = 0.1

if offset_x < 0:

angxular_vel = -0.1

update_cmd(linear_vel,angular_vel)

except:

continue

#update_cmd(linear_vel,angular_vel)

cv2.imshow("face_detector", result)

if cv2.waitKey(30) & 0xFF == ord('q'):

break

cap.release()

cv2.destroyAllWindows()

def update_cmd(linear_speed, angular_speed):

twist = Twist()

twist.linear.x = 1*linear_speed; twist.linear.y = 1*linear_speed; twist.linear.z = 1*linear_speed;

twist.angular.x = 0; twist.angular.y = 0; twist.angular.z = 1*angular_speed

pub.publish(twist)

def callback(msg):

global face_path

global face_name

if msg.data == "liwei":

face_name = "liwei"

if msg.data == "yaom":

face_name = "yaom"

face_rec()

if __name__ == "__main__":

rospy.init_node('face_detector')

rospy.Subscriber("auto_face", String, callback)###订阅消息 定义回调函数

rospy.spin()

四、语音控制

voicecontroller.py

#!/usr/bin/env python

# -*- coding: UTF-8 -*-

import sys

reload(sys)

sys.setdefaultencoding('utf8')

import os

import rospy

from respeaker_interface import x

from respeaker_audio import RespeakerAudio

from std_msgs.msg import String

class VoiceController(object):

def __init__(self, node_name):

self.node_name = node_name

rospy.init_node(node_name)

rospy.on_shutdown(self.shutdown)

self.respeaker_interface = RespeakerInterface()

self.respeaker_audio = RespeakerAudio()

self.ask_pub = rospy.Publisher('cmd_msg', String, queue_size=5)

def shutdown(self):

self.respeaker_interface.close()

self.respeaker_audio.stop()

def callback(msg):

os.system("mpg123 /home/bcsh/robot_ws/src/match_mini/voice/zhuabu.mp3")

if __name__ == '__main__':

voice_controller = VoiceController("voice_controller")

auto_slam = rospy.Publisher('auto_slam', String, queue_size=10) # 定义了话题对象auto_slam 发布话题的时候会调用话题的回调函数

auto_face = rospy.Publisher('auto_face', String, queue_size=10) # 定义了话题对象auto_face 发布话题的时候会调用话题的回调函数

rospy.Subscriber("get_pos", String,callback, queue_size=10)

rate = rospy.Rate(100)

isPub = False

while not rospy.is_shutdown():

text = voice_controller.respeaker_audio.record()### 记录音频输入流

if text.find("开始") >= 0 and isPub is not True:

auto_slam.publish("start")

isPub = True

if text.find("右") >= 0:

print("send liwei to auto_face")

auto_face.publish("liwei")

elif text.find("偷") >= 0:

print("send yaom to auto_face")

auto_face.publish("yaom")

direction = voice_controller.respeaker_interface.direction

print(text)

print(direction)

rate.sleep()

用到的类

RespeakerInterface 类用于与 Respeaker 设备进行通信

respeaker_audio.py

#!/usr/bin/env python

import pyaudio

from baidu_speech_api import BaiduVoiceApi

import json

import sys

import os

from aip.speech import AipSpeech

from contextlib import contextmanager

# 重新设置默认字符编码为 utf-8

reload(sys)

sys.setdefaultencoding("utf-8")

# 定义音频采样参数

CHUNK = 1024

RECORD_SECONDS = 5

# 百度语音识别 API 的应用参数

APP_ID = '41721436'

API_KEY = 'QG7UA5m5YZC0PLTw3qWzh2Xd'

SECRET_KEY = 'Y9Q22OM13s2oXLzMUzETiQk96SX7Geq3'

@contextmanager

def ignore_stderr(enable=True):

"""

用于忽略标准错误流的上下文管理器。

"""

if enable:

devnull = None

try:

devnull = os.open(os.devnull, os.O_WRONLY)

stderr = os.dup(2)

sys.stderr.flush()

os.dup2(devnull, 2)

try:

yield

finally:

os.dup2(stderr, 2)

os.close(stderr)

finally:

if devnull is not None:

os.close(devnull)

else:

yield

class RespeakerAudio(object):

def __init__(self, channel=0, suppress_error=True):

"""

初始化 RespeakerAudio 类。

"""

# 忽略标准错误流以避免输出 PyAudio 警告信息

with ignore_stderr(enable=suppress_error):

self.pyaudio = pyaudio.PyAudio()

# 初始化音频参数和设备信息

self.channels = None

self.channel = channel

self.device_index = None

self.rate = 16000

self.bitwidth = 2

self.bitdepth = 16

# 查找 Respeaker 设备

count = self.pyaudio.get_device_count()

for i in range(count):

info = self.pyaudio.get_device_info_by_index(i)

name = info["name"].encode("utf-8")

chan = info["maxInputChannels"]

# 如果设备名中包含 "respeaker",则认为是 Respeaker 设备

if name.lower().find("respeaker") >= 0:

self.channels = chan

self.device_index = i

break

# 如果没有找到 Respeaker 设备,则使用默认输入设备

if self.device_index is None:

info = self.pyaudio.get_default_input_device_info()

self.channels = info["maxInputChannels"]

self.device_index = info["index"]

# 确保选择的通道在有效范围内

self.channel = min(self.channels - 1, max(0, self.channel))

# 打开音频输入流

self.stream = self.pyaudio.open(

rate=self.rate,

format=self.pyaudio.get_format_from_width(self.bitwidth),

channels=1,

input=True,

input_device_index=self.device_index,

)

# 初始化百度语音 API

self.aipSpeech = AipSpeech(APP_ID, API_KEY, SECRET_KEY)

self.baidu = BaiduVoiceApi(appkey=API_KEY, secretkey=SECRET_KEY)

def stop(self):

"""

停止音频输入流。

"""

# 停止音频输入流

self.stream.stop_stream()

self.stream.close()

self.stream = None

# 终止 PyAudio

self.pyaudio.terminate()

def generator_list(self, lst):

"""

生成列表的生成器。

"""

for l in lst:

yield l

def record(self):

"""

录制音频并发送到百度语音识别 API 进行识别。

"""

# 启动音频输入流

self.stream.start_stream()

print("* recording")

frames = [] # 用于存储音频帧

# 录制指定的音频

for i in range(0, int(self.rate / CHUNK * RECORD_SECONDS)):

data = self.stream.read(CHUNK)

frames.append(data)

print("done recording")

# 停止音频输入流

self.stream.stop_stream()

print("start to send to Baidu")

# 将录制的音频发送到百度语音识别 API 进行识别

text = self.baidu.server_api(self.generator_list(frames))

# 解析识别结果并返回

if text:

try:

text = json.loads(text)####

for t in text['result']:

print(t)

return str(t)

except KeyError:

return "get nothing"

else:

print("get nothing")

return "get nothing"

if __name__ == '__main__':

# 创建 RespeakerAudio 实例

snowman_audio = RespeakerAudio()

# 持续录制并输出识别结果

while True:

text = snowman_audio.record()

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: