视频流以及图像开发

前面我们准备好电源以及板卡前期的开发准备,下面内容就是对于在RDK x3 module做图像开发处理的一个准备。

gstreamer 依赖安装

由于做视频流时需要使用gstreamer去获取相关的视频流,于是需要把gstreamer依赖安装好。

opencv 源码编译安装

虽然本身RDK x3 module已经集合了视频流以及流媒体开发模块,但是平时使用opencv的那套比较多,所以重新在板卡上对opencv重新源码编译安装,把不需要的模块去掉,只保留所需要的模块。当然,本质上主要是由于使用时发现板卡本身没有安装gstreamer 依赖,所以重新源码编译安装opencv并且把gstreamer视频流接口给编译到opencv上。

opencv最小模块项目

介绍一个开源仓库,一个在arm linux平台上以最小模块去编译opencv项目。因为在嵌入式开发过程中,最大的一个困惑就是内存以及网络。完整的opencv对于小内存的板卡来说比较够呛,所以可以通过配置把自己不需要的模块给去掉。

https://github.com/nihui/opencv-mobile

About

The minimal opencv for Android, iOS, ARM Linux, Windows, Linux, MacOS, WebAssembly

项目里面有关于如何编译最小模块的编译配置,我们也可以通过自己的配置灵活的去编译使用自己想要的功能模块。

依赖安装

sudo apt-get install -y build-essential libgtk-3-dev freeglut3-dev

cmake 配置

这里我们使用的是opencv的4.1.1,以及对应的版本的opencv_contrib模块。

无论如何下载,在下载下来之后注意目录级次

.
|— opencv
`— opencv_contrib

注意需要opencv和opencv_contrib在同一级别目录之下,之后进入opencv目录根文件夹

mkdir build 
cd build

然后作以下配置,即可以运行以下命令:

cmake -D CMAKE_BUILD_TYPE=RELEASE -D CMAKE_INSTALL_PREFIX=/usr/local -D OPENCV_GENERATE_PKGCONFIG=ON `cat ../opencv4_cmake_options.txt` -D OPENCV_EXTRA_MODULES_PATH=../../opencv_contrib/modules/ ..

cmake 参考所需要的学习资料:

CMake菜谱(CMake Cookbook中文版)

编译:

make
sudo make install

FAQ

  • Q:’FlannBasedMatcher’ is not a member of ‘cv’

    A:此错误为flann被关闭了,重新打开即可。

  • Q:’ppf_match_3d’ was not declared in this scope

    A:此错误解决make clean and make it again。

UnitreeCamerasdk

编译安装

cd UnitreecameraSDK
mkdir build
cd build
cmake ..
make

更改include路径:

#include <opencv4/opencv2/videoio.hpp>
#include <opencv4/opencv2/highgui.hpp>

常见错误

  • Q:编译过程中会出现很多关于armadillo.so.9无法链接的错误

    A:缺少了libarmadillo-dev,执行

    sudo apt install libarmadillo-dev
    
  • Q:cannot find -lEigen3::Eigen cmake error

    A:在 CMakeLists.txt文件中配置添加

find_package(Eigen3 REQUIRED) 
include_directories(${EIGEN3_INCLUDE_DIRS})

总结

上面搞一大堆发现原来根据最小opencv模块还是移植失败了嘛?并没有。那么下面直接进入正题

源码安装opencv注意事项

编译配置,一定要把这几个编译选项打开:

-DBUILD_opencv_imgcodecs=ON
-DBUILD_opencv_videoio=ON
-DBUILD_opencv_calib3d=ON
#https://github.com/nihui/opencv-mobile项目重写了Image IO functions in highgui module,在没有修改代码前提下必须开启这几个选项

查看linux下的opencv安装库:

pkg-config opencv4 --libs

查看linux下的opencv安装版本:

pkg-config opencv4 --modversion

查看linux下的opencv安装路径:

sudo find / -iname "*opencv4*"

ok,在研究了opencv对于imread等接口发现设置

opencv_imgcodecs=ON
opencv_videoio=ON
opencv_calib3d=ON

,是正常使用capture等读取接口的必要条件,在不改动代码的前提之下。

mediapipe 安装

已经2023年了,在RDK x3 module上面只需要使用pip就可以完成mediapipe的安装

pip install mediapipe

Mediapipe 简介
Mediapipe 是 google 的一个开源项目,可以提供开源的、跨平台的常用机器学习 (machine learning) 方案。Mediapipe 实际上是一个集成的机器学习视觉算法的工具库,包含了人脸检测、人脸关键点、手势识别、头像分割和姿态识别等各种模型。

Mediapipe 具备的优点有:
1)支持各种平台和语言,包括 IOS,Android,C++,Python,JAVAScript,Coral 等;
2)速度快,各种模型基本上可以做到实时运行。

Mediapipe 在实际应用中的例子:

1)人脸检测;
2)FaceMesh:从图像 / 视频中重建出人脸的 3D Mesh,可以用于 AR 渲染;
3)人像分割:从图像 / 视频中把人分割出来,可用于视频会议如 Zoom、钉钉;
4)手势识别和跟踪:可以识别标出手部 21 个关键点的 3D 坐标;
5)人体姿态识别:可以识别标出人体 33 个关键点的 3D 坐标。

安装完之后,就可以开始做自己想要的应用示例开发,下面是关于打开相机的示例一以及手势识别和跟踪示例二。

示例一:打开相机的示例

#引入依赖


import numpy as np
import math
import sys
import cv2



#设置相机端口

#udpX264Recv = "udpsrc address=192.168.123.15 port=9200 ! application/x-rtp,media=video,encoding-name=H264 ! rtph264depay ! h264parse ! omxh264dec ! videoconvert ! appsink";

#实例化相机对象
#cap = cv2.VideoCapture(udpX264Recv)
cap = cv2.VideoCapture(8)
#判断相机工作状态

if not cap.isOpened():
    print("VideoCapture Failed!")
    exit(1)

#读取视频流并显示

while True:
    success, frame=cap.read() 
    img = frame[0:400, 0:464] # 读取左边图像
    cv2.imshow("camera", img) # 使用窗口显示该帧图像
    if cv2.waitKey(1) & 0xFF == ord('q'): # 等待1ms并判断是否按下q键,若按下则退出
        break


# 关闭退出
cap.release() # 销毁相机对象
cv2.destroyAllWindows() # 关闭窗口

示例二手势识别以及跟踪

import csv
import copy
import argparse
import itertools
import sys
sys.path.append("..") 
from collections import Counter
from collections import deque

import cv2 as cv
import numpy as np
import mediapipe as mp

from utils import CvFpsCalc
from model import KeyPointClassifier
from model import PointHistoryClassifier
from libs import Unitree_Robot

def get_args():
    parser = argparse.ArgumentParser()

    parser.add_argument("--device", type=int, default=0)
    parser.add_argument("--width", help='cap width', type=int, default=960)
    parser.add_argument("--height", help='cap height', type=int, default=540)

    parser.add_argument('--use_static_image_mode', action='store_true')
    parser.add_argument("--min_detection_confidence",
                        help='min_detection_confidence',
                        type=float,
                        default=0.7)
    parser.add_argument("--min_tracking_confidence",
                        help='min_tracking_confidence',
                        type=int,
                        default=0.5)

    args = parser.parse_args()

    return args


def main():
    # 引数解析 #################################################################
    args = get_args()

    cap_device = args.device
    cap_width = args.width
    cap_height = args.height

    use_static_image_mode = args.use_static_image_mode
    min_detection_confidence = args.min_detection_confidence
    min_tracking_confidence = args.min_tracking_confidence

    use_brect = True

    # 初始化相机 ###############################################################
    cap = cv.VideoCapture(cap_device)
    cap.set(cv.CAP_PROP_FRAME_WIDTH, cap_width)
    cap.set(cv.CAP_PROP_FRAME_HEIGHT, cap_height)

    # 模型加载 #############################################################
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(
        static_image_mode=use_static_image_mode,
        max_num_hands=1,
        min_detection_confidence=min_detection_confidence,
        min_tracking_confidence=min_tracking_confidence,
    )

    keypoint_classifier = KeyPointClassifier()

    point_history_classifier = PointHistoryClassifier()

    # 读取标签 ###########################################################
    with open('model/keypoint_classifier/keypoint_classifier_label.csv',
              encoding='utf-8-sig') as f:
        keypoint_classifier_labels = csv.reader(f)
        keypoint_classifier_labels = [
            row[0] for row in keypoint_classifier_labels
        ]
    with open(
            'model/point_history_classifier/point_history_classifier_label.csv',
            encoding='utf-8-sig') as f:
        point_history_classifier_labels = csv.reader(f)
        point_history_classifier_labels = [
            row[0] for row in point_history_classifier_labels
        ]

    # FPS模块 ########################################################
    cvFpsCalc = CvFpsCalc(buffer_len=10)

    # 坐标历史队列 #################################################################
    history_length = 16
    point_history = deque(maxlen=history_length)

    # 手势历史队列 ################################################
    finger_gesture_history = deque(maxlen=history_length)

    # 初始化机器人 ########################################################################
    mode = 0
    unitree_go1 = Unitree_Robot()
    rotate = rotate2 = 0
    while True:
        fps = cvFpsCalc.get()

        # 按键処理(ESC:终止) #################################################
        key = cv.waitKey(10) & 0xff
        if key == 27:  # ESC
            break
        number, mode = select_mode(key, mode)

        # 相机捕捉 #####################################################
        ret, image = cap.read()
        if not ret:
            break
        image = cv.flip(image, 1)  #图像翻转
        debug_image = copy.deepcopy(image)

        # 检测 #############################################################
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)

        image.flags.writeable = False
        results = hands.process(image)
        image.flags.writeable = True

        #  ####################################################################
        if results.multi_hand_landmarks is not None:
            for hand_landmarks, handedness in zip(results.multi_hand_landmarks,
                                                  results.multi_handedness):
                # 计算边界矩形
                brect = calc_bounding_rect(debug_image, hand_landmarks)
                # landmark计算
                landmark_list = calc_landmark_list(debug_image, hand_landmarks)

                # 转换为相对坐标/归一化坐标
                pre_processed_landmark_list = pre_process_landmark(
                    landmark_list)
                pre_processed_point_history_list = pre_process_point_history(
                    debug_image, point_history)
                # 保存预测数据
                logging_csv(number, mode, pre_processed_landmark_list,
                            pre_processed_point_history_list)

                # 手势分类
                hand_sign_id = keypoint_classifier(pre_processed_landmark_list)
                print(hand_sign_id)
                if hand_sign_id == 2:  # 指差しサイン
                    point_history.append(landmark_list[8])  # 食指
                # if hand_sign_id == 3:
                #     print("Open! dog go ahead!")
                #     state = unitree_go1.robot_walking(gaitType = 1, forwardSpeed = 0.15, sidewaySpeed = 0.0, rotateSpeed = 0, speedLevel = 0, bodyHeight = 0)
                #     #rotate2 =0
                #     break
                # if hand_sign_id == 1 :
                #     print("Close! dog go back!")
                #     state = unitree_go1.robot_walking(gaitType = 1, forwardSpeed = -0.15, sidewaySpeed = 0.0, rotateSpeed = 0, speedLevel = 0, bodyHeight = 0)
                #     #rotate2 =0
                #     break
                # if hand_sign_id == 0 :#or hand_sign_id == 2:
                #     print("OK! dog stop!")
                #     rotate = 0.05
                #     rotate2 += 1
                #     if (rotate2//5)%2 == 1:
                #         rotate = - rotate
                #         #print("rotate:{}".format(rotate))
                #         #print("rotate2:{}".format(rotate2))
                #         state = unitree_go1.robot_walking(gaitType = 1, forwardSpeed = 0, sidewaySpeed = 0.0, rotateSpeed = rotate, speedLevel = 0, bodyHeight = 0)
                #         break
                else:
                    point_history.append([0, 0])


                # 手势分类
                finger_gesture_id = 0
                point_history_len = len(pre_processed_point_history_list)
                if point_history_len == (history_length * 2):
                    finger_gesture_id = point_history_classifier(
                        pre_processed_point_history_list)

                # 计算最近检测中最多的手势ID
                finger_gesture_history.append(finger_gesture_id)
                most_common_fg_id = Counter(
                    finger_gesture_history).most_common()
                print(most_common_fg_id)
                # 描画
                debug_image = draw_bounding_rect(use_brect, debug_image, brect)
                debug_image = draw_landmarks(debug_image, landmark_list)
                debug_image = draw_info_text(
                    debug_image,
                    brect,
                    handedness,
                    keypoint_classifier_labels[hand_sign_id],
                    point_history_classifier_labels[most_common_fg_id[0][0]],
                )
        else:
            point_history.append([0, 0])

        debug_image = draw_point_history(debug_image, point_history)
        debug_image = draw_info(debug_image, fps, mode, number)

        # 画面反映 #############################################################
        cv.imshow('Hand Gesture Recognition', debug_image)

    cap.release()
    cv.destroyAllWindows()


def select_mode(key, mode):
    number = -1
    if 48 <= key <= 57:  # 0 ~ 9
        number = key - 48
    if key == 110:  # n
        mode = 0
    if key == 107:  # k
        mode = 1
    if key == 104:  # h
        mode = 2
    return number, mode


def calc_bounding_rect(image, landmarks):
    image_width, image_height = image.shape[1], image.shape[0]

    landmark_array = np.empty((0, 2), int)

    for _, landmark in enumerate(landmarks.landmark):
        landmark_x = min(int(landmark.x * image_width), image_width - 1)
        landmark_y = min(int(landmark.y * image_height), image_height - 1)

        landmark_point = [np.array((landmark_x, landmark_y))]

        landmark_array = np.append(landmark_array, landmark_point, axis=0)

    x, y, w, h = cv.boundingRect(landmark_array)

    return [x, y, x + w, y + h]


def calc_landmark_list(image, landmarks):
    image_width, image_height = image.shape[1], image.shape[0]

    landmark_point = []

    #关键
    for _, landmark in enumerate(landmarks.landmark):
        landmark_x = min(int(landmark.x * image_width), image_width - 1)
        landmark_y = min(int(landmark.y * image_height), image_height - 1)
        # landmark_z = landmark.z

        landmark_point.append([landmark_x, landmark_y])

    return landmark_point


def pre_process_landmark(landmark_list):
    temp_landmark_list = copy.deepcopy(landmark_list)

    # 转换为相对坐标
    base_x, base_y = 0, 0
    for index, landmark_point in enumerate(temp_landmark_list):
        if index == 0:
            base_x, base_y = landmark_point[0], landmark_point[1]

        temp_landmark_list[index][0] = temp_landmark_list[index][0] - base_x
        temp_landmark_list[index][1] = temp_landmark_list[index][1] - base_y

    # 转换为一维列表
    temp_landmark_list = list(
        itertools.chain.from_iterable(temp_landmark_list))

    # 归一化
    max_value = max(list(map(abs, temp_landmark_list)))

    def normalize_(n):
        return n / max_value

    temp_landmark_list = list(map(normalize_, temp_landmark_list))

    return temp_landmark_list


def pre_process_point_history(image, point_history):
    image_width, image_height = image.shape[1], image.shape[0]

    temp_point_history = copy.deepcopy(point_history)

    # 转换为相对坐标
    base_x, base_y = 0, 0
    for index, point in enumerate(temp_point_history):
        if index == 0:
            base_x, base_y = point[0], point[1]

        temp_point_history[index][0] = (temp_point_history[index][0] -
                                        base_x) / image_width
        temp_point_history[index][1] = (temp_point_history[index][1] -
                                        base_y) / image_height

    # 转换为一维列表
    temp_point_history = list(
        itertools.chain.from_iterable(temp_point_history))

    return temp_point_history


def logging_csv(number, mode, landmark_list, point_history_list):
    if mode == 0:
        pass
    if mode == 1 and (0 <= number <= 9):
        csv_path = 'model/keypoint_classifier/keypoint.csv'
        with open(csv_path, 'a', newline="") as f:
            writer = csv.writer(f)
            writer.writerow([number, *landmark_list])
    if mode == 2 and (0 <= number <= 9):
        csv_path = 'model/point_history_classifier/point_history.csv'
        with open(csv_path, 'a', newline="") as f:
            writer = csv.writer(f)
            writer.writerow([number, *point_history_list])
    return

值得注意的是这里面依赖了tensorflow,rdk没有gpu我们直接使用cpu版本即可,使用pip 安装tensorflow即可.