前言

之前写过几篇基于OriginBot的家庭监控方案的博客,后来搁置了很久。

最近打算重新开启这个计划,但是想了很久发现“家庭监控方案”这个描述没有准确地表述我的想法,所以重新给这个计划想了一个名字,叫做“OriginBot家庭助理”。

关于这个计划,目前还没有非常完善的想法,初步打算把之前开发的监控和语音控制这两个空能迁移过来,并且重写前端,之前的前端是利用一些现有的代码拼接的,比较乱,也有点丑,所以打算这次也重写,好好经营这个计划。

项目框架介绍

这个项目在代码上会分为三个部分:前端、后端和OriginBot控制三个部分,下面单独介绍。

代码库在这里:originbot_home_assistant

1. 前端

这次采用前后段分离架构,基于VUE3+ElementPlus开发前端,这样能最大程度利用开源框架的现有的功能,在UI效果和开发量之前做一个平衡。

具体介绍之前,先看一下效果:

首页是语音控制的交互页面,后面会在左上角红框出添加不同的按钮导向到不同功能的交互页面。

把代码仓库克隆下来之后,进入到frontend目录下,执行以下命令行以运行前端

npm install  # 安装第三方依赖包 
npm run dev  # 以开发模式运行VUE3

由于前端部分涉及到一些VUE3的内容,我不打算写代码实现的部分,有兴趣的同学可以看我以前写的关于VUE3的系列博客

2. 后端

后端是基于Django + Django Rest Framework 开发的。

Django是一个非常完善、好用的python web后端框架,Django Rest Framework为其提供了API能力。

进入到代码仓库的backend目录下,执行以下命令:

pip install -r requirements.tct
python manage.py migrate
python manage.py runserver  # 以开放的模式在本地运行

跟前端的代码一样,后端部分涉及到很多跟Django和web开发相关的细节,我不会在这里详细写,有兴趣的同学同样可以参考我以前写的博客

3. OriginBot控制

OriginBot控制部分代码会嵌套在后端的代码里面,所以放在了 backend/utils/ 这个目录里面。

目前只有一个相关的代码,是跟语音控制相关的,就是通过语音控制OriginBot,下面是语音控制的逻辑图。

下面是利用腾讯云语音识别服务相关的代码:

# 文件位置是:backend/backend/voice_control/views.py
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.asr.v20190614 import asr_client, models
from rest_framework.views import APIView
from rest_framework.response import Response
from django.core.files.uploadedfile import InMemoryUploadedFile
import base64
import os
import json
import time
import logging

from utils.chatgpt import ChatGPT


logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)


class VoiceControl(APIView):
    def post(self, request, *args, **kwargs):
        # 获取到前端传过来的语音内容
        audio_file = request.data.get('audio_file')

        if not audio_file:
            return Response({'status': 'error', 'message': '没有接收到音频数据'})

        assert isinstance(audio_file, InMemoryUploadedFile)

        audio_bytes = audio_file.read()

        cred = credential.Credential(
            os.environ.get('TENCENT_SECRET_ID'), os.environ.get('TENCENT_SECRET_KEY')
        )
        httpProfile = HttpProfile()
        httpProfile.endpoint = "asr.tencentcloudapi.com"

        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        client = asr_client.AsrClient(cred, "ap-shanghai", clientProfile)

        req = models.CreateRecTaskRequest()
        params = {
            "EngineModelType": "16k_zh",
            "ChannelNum": 1,
            "ResTextFormat": 0,
            "SourceType": 1,
            "Data": base64.b64encode(audio_bytes).decode('utf-8'),
            "DataLen": len(audio_bytes),
        }
        req.from_json_string(json.dumps(params))

        try:
            resp = client.CreateRecTask(req)
            taskId = json.loads(resp.to_json_string())["Data"]["TaskId"]

            # 轮询获取任务结果
            while True:
                time.sleep(1)
                req = models.DescribeTaskStatusRequest()
                params = {"TaskId": taskId}
                req.from_json_string(json.dumps(params))
                resp = client.DescribeTaskStatus(req)
                status = json.loads(resp.to_json_string())["Data"]["Status"]

                if status == 2:  # 任务完成
                    transcript = json.loads(resp.to_json_string())["Data"]["Result"]
                    logging.info('识别结果:', transcript)
                    # 在这里调用chatgpt相关的功能
                    chat = ChatGPT()
                    chat.generate(transcript)
                    return Response({'status': 'success', 'transcript': transcript})
                elif status == 3:  # 任务失败
                    return Response({'status': 'error', 'message': '语音识别任务失败'})
        except Exception as e:
            return Response({'status': 'error', 'message': f'语音识别服务出错:{str(e)}'})

    def get(self, request, *args, **kwargs):
        return Response({'status': 'error', 'message': '无效的请求方法'})



下面是chatGPT相关的代码:

# 文件位置为 backend/utils/cahtgpt.py
import openai
import time
import os
import env


class ChatGPT:
    def __init__(self):
        openai.api_key = env.API_KEY
        openai.api_base = env.API_BASE
        openai.api_type = 'azure'
        openai.api_version = '2022-12-01'
        self.messages = []
        self.max_token_length = 3000
        self.max_completion_length = 1000
        self.last_response = None
        self.query = ''
        self.instruction = ''

    def extract_json_part(self, text):
        # because the json part is in the middle of the text, we need to extract it.
        # json part is between ``` and ```.
        # skip if there is no json part
        if text.find('```') == -1:
            return text
        text_json = text[text.find('```') + 3 : text.find('```', text.find('```') + 3)]
        return text_json

    def remove_leading_spaces(self, code, spaces=8):
        lines = code.split('\n')  # 按行分割代码
        processed_lines = [
            line[spaces:] if line.startswith(' ' * spaces) else line for line in lines
        ]  # 移除每一行开头的空格
        return '\n'.join(processed_lines)

    def generate(self, user_input):
        prompt = """[user]
        我会交给你一些信息、要求,请你完成任务。
        以下是信息:
        1. OriginBot是一个两轮的轮式机器人,基于ROS2 foxy开发
        2. 运动速度的topic是/cmd_vel
        以下是要求:
        1. 代码只能用Python
        2. 要基于rclpy.node的Node类拓展
        3. 代码中要考虑在合适的地方调用rclpy.shutdown()来销毁当前Node节点,而不是要求手动介入
        4. 线速度为0.2米/s
        5. 输出只能包含markdown格式的python代码,格式模板如下:
        ``python
        ###---
        import rclpy
        from geometry_msgs.msg import Twist
        from rclpy.node import Node


        class OriginBot(Node):
            def __init__(self):
                ...

            def xxx_function(self):
                ...

        def main(args=None):
            rclpy.init(args=args)
            originbot = OriginBot()
            rclpy.spin(originbot)
            originbot.destroy_node()
            rclpy.shutdown()

        if __name__ == '__main__':
            main()
        ###---
        ``
        代码开头和结尾的###---都必须要显示的输出,并且最后要调用main函数
        以下是任务: 
        """
        prompt += user_input
        response = openai.Completion.create(
            engine=env.ENGINE,
            prompt=prompt,
            temperature=0.1,
            max_tokens=self.max_completion_length,
            top_p=0.95,
            frequency_penalty=0.0,
            presence_penalty=0.0,
            stop=None,
        )
        text = response['choices'][0]['text']
        print(text)
        res_list = text.split("###---")
        res = res_list[1]
        res = self.remove_leading_spaces(res)
        with open('OriginBot.py', 'w') as f:
            f.write(res)
        os.system('python3 OriginBot.py')
        time.sleep(30)
        os.remove('OriginBot.py')

    def execute(self):
        os.system('python3 OriginBot.py')
        time.sleep(30)
        os.remove('OriginBot.py')

关于这一部分,目前的逻辑是通过prompt来使GPT生成代码进而控制机器人,其实还有另外一个思路,我们实现写好各种小单元的代码,例如左转10度,前进1米,后退0.2米等等,然后做一个mapping,最后把这个mapping告诉GPT,然后GPT根据语音来调用这些代码,这样稳定性更高一些。

注意

在运行代码之前,一定要在backend/utils/目录下创建一个env.py, 这里面是项目中使用到的各种各样的秘钥等重要的信息,内容如下:

API_KEY = "xxxxxxxxxxxx"
API_BASE = "xxxxxxxxxxxxxxxx"
ENGINE = "xxxxxxx"