前言
之前写过几篇基于OriginBot的家庭监控方案的博客,后来搁置了很久。
最近打算重新开启这个计划,但是想了很久发现“家庭监控方案”这个描述没有准确地表述我的想法,所以重新给这个计划想了一个名字,叫做“OriginBot家庭助理”。
关于这个计划,目前还没有非常完善的想法,初步打算把之前开发的监控和语音控制这两个空能迁移过来,并且重写前端,之前的前端是利用一些现有的代码拼接的,比较乱,也有点丑,所以打算这次也重写,好好经营这个计划。
项目框架介绍
这个项目在代码上会分为三个部分:前端、后端和OriginBot控制三个部分,下面单独介绍。
代码库在这里:originbot_home_assistant
1. 前端
这次采用前后段分离架构,基于VUE3+ElementPlus开发前端,这样能最大程度利用开源框架的现有的功能,在UI效果和开发量之前做一个平衡。
具体介绍之前,先看一下效果:
首页是语音控制的交互页面,后面会在左上角红框出添加不同的按钮导向到不同功能的交互页面。
把代码仓库克隆下来之后,进入到frontend目录下,执行以下命令行以运行前端
npm install # 安装第三方依赖包
npm run dev # 以开发模式运行VUE3
由于前端部分涉及到一些VUE3的内容,我不打算写代码实现的部分,有兴趣的同学可以看我以前写的关于VUE3的系列博客。
2. 后端
后端是基于Django + Django Rest Framework 开发的。
Django是一个非常完善、好用的python web后端框架,Django Rest Framework为其提供了API能力。
进入到代码仓库的backend目录下,执行以下命令:
pip install -r requirements.tct
python manage.py migrate
python manage.py runserver # 以开放的模式在本地运行
跟前端的代码一样,后端部分涉及到很多跟Django和web开发相关的细节,我不会在这里详细写,有兴趣的同学同样可以参考我以前写的博客 。
3. OriginBot控制
OriginBot控制部分代码会嵌套在后端的代码里面,所以放在了 backend/utils/
这个目录里面。
目前只有一个相关的代码,是跟语音控制相关的,就是通过语音控制OriginBot,下面是语音控制的逻辑图。
下面是利用腾讯云语音识别服务相关的代码:
# 文件位置是:backend/backend/voice_control/views.py
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.asr.v20190614 import asr_client, models
from rest_framework.views import APIView
from rest_framework.response import Response
from django.core.files.uploadedfile import InMemoryUploadedFile
import base64
import os
import json
import time
import logging
from utils.chatgpt import ChatGPT
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
class VoiceControl(APIView):
def post(self, request, *args, **kwargs):
# 获取到前端传过来的语音内容
audio_file = request.data.get('audio_file')
if not audio_file:
return Response({'status': 'error', 'message': '没有接收到音频数据'})
assert isinstance(audio_file, InMemoryUploadedFile)
audio_bytes = audio_file.read()
cred = credential.Credential(
os.environ.get('TENCENT_SECRET_ID'), os.environ.get('TENCENT_SECRET_KEY')
)
httpProfile = HttpProfile()
httpProfile.endpoint = "asr.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = asr_client.AsrClient(cred, "ap-shanghai", clientProfile)
req = models.CreateRecTaskRequest()
params = {
"EngineModelType": "16k_zh",
"ChannelNum": 1,
"ResTextFormat": 0,
"SourceType": 1,
"Data": base64.b64encode(audio_bytes).decode('utf-8'),
"DataLen": len(audio_bytes),
}
req.from_json_string(json.dumps(params))
try:
resp = client.CreateRecTask(req)
taskId = json.loads(resp.to_json_string())["Data"]["TaskId"]
# 轮询获取任务结果
while True:
time.sleep(1)
req = models.DescribeTaskStatusRequest()
params = {"TaskId": taskId}
req.from_json_string(json.dumps(params))
resp = client.DescribeTaskStatus(req)
status = json.loads(resp.to_json_string())["Data"]["Status"]
if status == 2: # 任务完成
transcript = json.loads(resp.to_json_string())["Data"]["Result"]
logging.info('识别结果:', transcript)
# 在这里调用chatgpt相关的功能
chat = ChatGPT()
chat.generate(transcript)
return Response({'status': 'success', 'transcript': transcript})
elif status == 3: # 任务失败
return Response({'status': 'error', 'message': '语音识别任务失败'})
except Exception as e:
return Response({'status': 'error', 'message': f'语音识别服务出错:{str(e)}'})
def get(self, request, *args, **kwargs):
return Response({'status': 'error', 'message': '无效的请求方法'})
下面是chatGPT相关的代码:
# 文件位置为 backend/utils/cahtgpt.py
import openai
import time
import os
import env
class ChatGPT:
def __init__(self):
openai.api_key = env.API_KEY
openai.api_base = env.API_BASE
openai.api_type = 'azure'
openai.api_version = '2022-12-01'
self.messages = []
self.max_token_length = 3000
self.max_completion_length = 1000
self.last_response = None
self.query = ''
self.instruction = ''
def extract_json_part(self, text):
# because the json part is in the middle of the text, we need to extract it.
# json part is between ``` and ```.
# skip if there is no json part
if text.find('```') == -1:
return text
text_json = text[text.find('```') + 3 : text.find('```', text.find('```') + 3)]
return text_json
def remove_leading_spaces(self, code, spaces=8):
lines = code.split('\n') # 按行分割代码
processed_lines = [
line[spaces:] if line.startswith(' ' * spaces) else line for line in lines
] # 移除每一行开头的空格
return '\n'.join(processed_lines)
def generate(self, user_input):
prompt = """[user]
我会交给你一些信息、要求,请你完成任务。
以下是信息:
1. OriginBot是一个两轮的轮式机器人,基于ROS2 foxy开发
2. 运动速度的topic是/cmd_vel
以下是要求:
1. 代码只能用Python
2. 要基于rclpy.node的Node类拓展
3. 代码中要考虑在合适的地方调用rclpy.shutdown()来销毁当前Node节点,而不是要求手动介入
4. 线速度为0.2米/s
5. 输出只能包含markdown格式的python代码,格式模板如下:
``python
###---
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
class OriginBot(Node):
def __init__(self):
...
def xxx_function(self):
...
def main(args=None):
rclpy.init(args=args)
originbot = OriginBot()
rclpy.spin(originbot)
originbot.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
###---
``
代码开头和结尾的###---都必须要显示的输出,并且最后要调用main函数
以下是任务:
"""
prompt += user_input
response = openai.Completion.create(
engine=env.ENGINE,
prompt=prompt,
temperature=0.1,
max_tokens=self.max_completion_length,
top_p=0.95,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=None,
)
text = response['choices'][0]['text']
print(text)
res_list = text.split("###---")
res = res_list[1]
res = self.remove_leading_spaces(res)
with open('OriginBot.py', 'w') as f:
f.write(res)
os.system('python3 OriginBot.py')
time.sleep(30)
os.remove('OriginBot.py')
def execute(self):
os.system('python3 OriginBot.py')
time.sleep(30)
os.remove('OriginBot.py')
关于这一部分,目前的逻辑是通过prompt来使GPT生成代码进而控制机器人,其实还有另外一个思路,我们实现写好各种小单元的代码,例如左转10度,前进1米,后退0.2米等等,然后做一个mapping,最后把这个mapping告诉GPT,然后GPT根据语音来调用这些代码,这样稳定性更高一些。
注意
在运行代码之前,一定要在backend/utils/目录下创建一个env.py, 这里面是项目中使用到的各种各样的秘钥等重要的信息,内容如下:
API_KEY = "xxxxxxxxxxxx"
API_BASE = "xxxxxxxxxxxxxxxx"
ENGINE = "xxxxxxx"
评论(2)
您还未登录,请登录后发表或查看评论