打造家庭助理机器人OriginBot,我希望它能够识别并欢迎家庭成员。为此,我引入了“家人识别”功能,它由人脸检测和人脸识别两大核心部分组成。

「人脸检测」是识别摄像头图像中是否存在人脸的过程。我采用了经典的Haar cascades算法,并对其进行了优化,确保它能在ROS环境中高效运行。通过将ROS图像转换为OpenCV格式,我们能够在图像上准确地标出人脸位置,并在检测到人脸时进行标记。

「人脸识别」则是确定图像中人脸身份的高级技术。我选择了阿里云视觉智能开放平台。因为对于非算法专业人员来说,最方便。

人脸检测

人脸检测借鉴了https://www.guyuehome.com/45655里面的内容。我把其中的代码做了一些优化并添加了详细的注释,优化后的代码如下:

# 导入所需库
import cv2
import cv_bridge
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image

# 定义人脸检测节点
class FaceDetection(Node):
    def __init__(self, cascade_path, image_topic, output_topic):
        super().__init__('face_detection')  # 初始化节点,节点名为'face_detection'
        self.classifier_path = cascade_path  # haarcascade模型路径

        # 实例化cv_bridge对象,用来转换ROS图像和OpenCV图像
        self.bridge = cv_bridge.CvBridge()
        # 加载预训练的人脸检测模型
        self.face_cascade = cv2.CascadeClassifier(self.classifier_path)
        # 订阅图像主题,注册回调函数image_callback
        self.image_sub = self.create_subscription(Image, image_topic, self.image_callback, 10)
        # 创建Publisher,主题名为output_topic,队列长度为10
        self.pub = self.create_publisher(Image, output_topic, 10)

    # 定义图像回调函数
    def image_callback(self, msg):
        # 将接收到的ROS图像消息转化为OpenCV图像
        image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
        # 将图像转为灰度图,因为人脸检测需要灰度图
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        # 进行人脸检测
        faces = self.face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.2,  # 表示每次图像尺寸减小的比例
            minNeighbors=3,  # 表示每一个目标至少要被检测到3次才算是真的目标
            minSize=(2020)  # 设置人脸的最小尺寸
        )

        # 如果检测到人脸,就在图像上画一个矩形框表示人脸
        if len(faces) > 0:
            for (x, y, w, h) in faces:
                cv2.rectangle(image, (x, y), (x + w, y + h), (25500), 2)

        # 将OpenCV图像转化为ROS图像消息,并发布出去
        self.pub.publish(self.bridge.cv2_to_imgmsg(image, 'bgr8'))

# 定义主函数
def main(args=None):
    rclpy.init(args=args)  # 初始化ROS
    face_detection = FaceDetection("haarcascade_frontalface_default.xml""/image_raw""/camera/process_image")  # 实例化FaceDetection节点
    rclpy.spin(face_detection)  # 开始循环,不断调用回调函数
    face_detection.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭ROS

# 如果直接运行这个文件,就执行main函数
if __name__ == '__main__':
    main()

这里的人脸检测算法是Haar cascades,这是一个比较旧的算法,可能在某些情况下无法检测到人脸或者产生误检。可以考虑使用一些更高级的算法,例如MTCNN,Dlib HOG或者Dlib CNN,以后再优化。

把上面这个代码封装到一个ros2 package中,编译之后就可以使用了。

人脸识别

目前比较常用的人脸识别算法是FaceNet。

以下引用内容有chatGPT4 产生

FaceNet是Google于2015年发布的一种深度学习的人脸识别系统。FaceNet的目标是将人脸图像映射到欧氏空间,使得同一人的不同图像之间的距离尽可能小,而不同人的图像之间的距离尽可能大。这种映射是通过一个深度卷积神经网络实现的,网络的结构可以是Inception模型,也可以是其他的模型。

「优点:」

  1. 高精度:FaceNet 在 LFW(Labeled Faces in the Wild)和 YouTube Faces DB 等公开数据集上都取得了最好的性能。
  2. 端到端学习:FaceNet是一个端到端的系统,整个系统(包括特征提取和度量学习)都可以一起优化。
  3. 实时性:由于网络可以直接输出嵌入向量,因此可以用于实时的人脸识别应用。

「缺点:」

  1. 训练难度大:FaceNet使用的三元组损失需要精心选择正例和负例,训练过程比较复杂。
  2. 需要大量标记数据:虽然FaceNet只需要身份标签,但是为了获得好的性能,仍然需要大量的训练数据。
  3. 对数据质量敏感:如果训练数据中有错误的标签,可能会对训练结果造成影响。

把这样一个比较大的算法直接部署在OriginBot上运行起来效果估计不会很好,毕竟需要的算力很大。再加上我本身不是做算法出身的,直接手撸FaceNet对我有点难,所以我最后选择使用阿里云的视觉智能开放平台。

阿里云视觉智能开放平台提供了一系列高效、易用的视觉智能API接口,旨在帮助用户轻松实现图像识别、视频分析、图像搜索等功能,从而提升业务效率和用户体验,这对我来说正适合。

以下是阿里云视觉智能开放平台的一些主要特点和功能:

  1. 「丰富的API接口」:平台提供了丰富的API接口,涵盖了图像识别、视频分析、图像搜索等多个领域。用户可以根据自己的需求选择合适的接口进行调用。其中就包含了我需要的功能。

  2. 「高度可定制」:用户可以根据自己的业务场景定制模型,例如通过训练自己的图像识别模型来识别特定的物体或场景。

  3. 「强大的图像识别能力」:平台支持识别多种类型的图像内容,包括物体、场景、人脸、文字等。此外,还可以进行图像风格转换、情感分析等高级功能。

  4. 「实时视频分析」:平台提供实时视频分析功能,可以对视频流进行实时处理,识别视频中的特定物体、场景或行为。

  5. 「图像搜索服务」:用户可以通过上传图片或提供图片URL,快速在海量图像库中找到相似的图片,支持以图搜图的功能。

  6. 「易用性和灵活性」:平台提供了完善的开发者文档和SDK,支持多种编程语言,方便用户快速集成和使用。同时,平台还提供了在线测试和调试工具,帮助用户快速验证和优化接口调用效果。

  7. 「安全可靠」:阿里云视觉智能开放平台基于阿里云的安全体系,确保用户数据的安全性和隐私性。

  8. 「弹性伸缩」:平台支持弹性伸缩,可以根据用户的业务需求自动调整资源,确保在高并发场景下的稳定性和性能。

为阿里云做个小广告吧,阿里云的这些服务对于非专业算法人员来说真的非常实用,而且降价后价格也不贵,个人使用的话完全负担得起。

我要使用的是其中的searchFace这个功能,详细的说明可以参考官方文档

简单来说,需要先创建一个人脸数据库,然后把家人的人脸照片传上去,上传的时候,照片需要以名字拼音命名,这样在识别的时候才能知道具体是谁。

阿里云官方有API调试台,可以在页面上直接调试,自动生成代码,最终的代码如下:

import os
import sys

from typing import List

from alibabacloud_facebody20191230.client import Client as facebody20191230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_facebody20191230 import models as facebody_20191230_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> facebody20191230Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """

        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
        # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint 请参考 https://api.aliyun.com/product/facebody
        config.endpoint = f'facebody.cn-shanghai.aliyuncs.com'
        return facebody20191230Client(config)

    @staticmethod
    def main(
        args: List[str],
    )
 -> None:

        client = Sample.create_client()
        search_face_request = facebody_20191230_models.SearchFaceRequest(
            db_name='default',
            image_url='xxxxxxxxxxxxxxx',  # 这是需要备检测的图片
            limit=1,
            quality_score_threshold=80
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            client.search_face_with_options(search_face_request, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    )
 -> None:

        client = Sample.create_client()
        search_face_request = facebody_20191230_models.SearchFaceRequest(
            db_name='default',
            image_url='xxxxxxxxxx',
            limit=1,
            quality_score_threshold=80
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            await client.search_face_with_options_async(search_face_request, runtime)
        except Exception as error:
            # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            # 错误 message
            print(error.message)
            # 诊断地址
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

虽然这个代码可以达到人脸识别的目的了,但是还不能直接在OriginBot上运行,我们需要做一些调整:

  1. 代码中待识别的图片是通过url传进去的,需要改成从第一部分人脸检测的输出来接受
  2. 把代码改成一个ros2 的node更适合在OriginBot上运行

基于以上两点想法,我修改后的代码如下:

import os
import cv2
import cv_bridge
import base64
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from alibabacloud_facebody20191230.client import Client as facebody20191230Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_facebody20191230 import models as facebody_20191230_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class FaceRecognitionNode(Node):
    def __init__(self):
        super().__init__('face_recognition')  # 初始化节点,节点名为'face_recognition'
        
        self.classifier_path = "haarcascade_frontalface_default.xml"  # haarcascade模型路径
        self.bridge = cv_bridge.CvBridge()  # 实例化cv_bridge对象,用来转换ROS图像和OpenCV图像
        self.face_cascade = cv2.CascadeClassifier(self.classifier_path)  # 加载预训练的人脸检测模型
        self.image_sub = self.create_subscription(
            Image, '/camera/process_image', self.image_callback, 10
        )  # 订阅'/camera/process_image'主题,注册回调函数image_callback,设置队列长度为10
        
        self.pub = self.create_publisher(Image, 'recognized_faces'10)  # 创建Publisher,主题名为'recognized_faces',队列长度为10
        
        # 创建阿里云API客户端,用于调用人脸识别接口
        config = open_api_models.Config(
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        config.endpoint = 'facebody.cn-shanghai.aliyuncs.com'
        self.client = facebody20191230Client(config)

    def image_callback(self, msg):
        image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.2,
            minNeighbors=3,
            minSize=(2020)
        )

        for (x, y, w, h) in faces:
            cv2.rectangle(image, (x, y), (x + w, y + h), (25500), 2)
        
        # 使用阿里云API进行人脸识别
        try:
            search_face_request = facebody_20191230_models.SearchFaceRequest(
                db_name='default',
                image_url=self.convert_image_to_base64(image),
                limit=1
            )
            runtime = util_models.RuntimeOptions()
            response = self.client.search_face_with_options(search_face_request, runtime)
            recognized_faces = response.data.get("FaceMatches", [])
            if recognized_faces:
                # 处理识别到的人脸信息
                for face in recognized_faces:
                    print(f"Recognized face: {face.get('PersonName')}")
        except Exception as error:
            self.get_logger().error('Failed to recognize face: %s' % error)
            return
        
        # 发布处理后的图像
        self.pub.publish(self.bridge.cv2_to_imgmsg(image, 'bgr8'))

    def convert_image_to_base64(self, image):
        """将OpenCV图像转换为Base64编码的字符串"""
        ret, buffer = cv2.imencode('.jpg', image)
        if not ret:
            raise Exception("Failed to encode image")
        base64_str = base64.b64encode(buffer).decode('utf8')
        return base64_str

def main(args=None):
    rclpy.init(args=args)  # 初始化ROS
    node = FaceRecognitionNode()  # 实例化FaceRecognitionNode节点
    rclpy.spin(node)  # 开始循环,不断调用回调函数
    node.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭ROS

if __name__ == '__main__':
    main()

代码中已经添加了很详细的注释了,不再赘述,唯一需要说明的是,ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET是阿里云的AKSK,关于如何生成它们以及应当赋予什么权限,需要严格根据官方文档来操作,否则会导致SDK调用失败。

待优化

大家可以看到,目前在人脸识别中,如果检测到是家人的话,仅仅只是打印出一些日志信息,代码如下:

for face in recognized_faces:
    print(f"Recognized face: {face.get('PersonName')}")

这里以后需要优化,应该要结合具体场景做出一些有意义的动作,比如在回家的时候说一些欢迎词、或者发现有人跌倒了可以知道具体是谁跌倒了等等。