上一篇我们说的是树莓派camera的libcamera库,提供了在命令行的对camera的操作功能。今天要说的是picamera2库,树莓派官方提供的picamera2库是针对libcamera 驱动提供的 python库。Picamera2仅支持Raspberry Pi OS Bullseye 以及更新的系统。

对于Raspberry Pi OS Bullseye以及更(四声)新的系统,picamera2已经预装在系统中,无法单独安装。

在使用python和picamera2编程之前,我们先用libcamera库测试一下camera是否正常工作。

libcamera-hello -t 0

这个指令在上一篇中说过,会打开一个视频流的预览窗口,持续时间为无穷大。

picamera2默认被安装在了系统环境中,但是在树莓派中,如果我们在系统环境下通过pip install去安装新的包,会报如下错误:

yan@raspberrypi:~ $ pip3 install pytesseract
error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
    python3-xyz, where xyz is the package you are trying to
    install.

    If you wish to install a non-Debian-packaged Python package,
    create a virtual environment using python3 -m venv path/to/venv.
    Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
    sure you have python3-full installed.

    For more information visit http://rptl.io/venv

note: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-system-packages.
hint: See PEP 668 for the detailed specification.

因此我们要创建一个虚拟环境,来安装我们需要的其他包。同时,我们创建的虚拟环境,要继承系统环境所有的包(主要是picamera2)。

python3 -m venv --system-site-packages diy_camera
source diy_camera/bin/activate

然后我们在diy_camera这个虚拟环境中进行安装其他的包。

顺便给pip换个源。

pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/

下面写个最简单的picamera2的测试程序。

简单的测试程序

from picamera2 import Picamera2, Preview
import time

picam2 = Picamera2()
camera_config = picam2.create_preview_configuration()
picam2.configure(camera_config)
picam2.start_preview(Preview.QTGL)
picam2.start()
time.sleep(2)

picam2.capture_file("test.jpg")

其中,picam2.start_preview第一个参数,除了使用Preview.QTGL,还可以使用Preview.DRM。picamera2的原文是这么说的:Non X Windows users should use the same script, but replacing Preview.QTGL by Preview.DRM, so as to use the non X Windows preview implementation。

这句话是在说明在非X Window系统(指的是没有图形用户界面的Linux系统)中,应该使用 Preview.DRM 代替 Preview.QTGL 以便使用非X Window的预览实现。

具体来说:

  • Preview.QTGL 是用于在X Window系统中使用的一种预览类型,它会利用X Window系统的功能来显示图像预览窗口。
  • Preview.DRM 是用于在非X Window系统中的一种预览类型,它使用Direct Rendering Manager (DRM) 接口,这是一种Linux内核中用于图形渲染的接口。在没有X Window的系统中,可以使用DRM来实现图形显示。

所以,这句话的意思是说,如果你在一个没有X Window系统的环境中(比如一个服务器或者一个没有图形界面的嵌入式系统),你应该将脚本中的 Preview.QTGL 替换为 Preview.DRM,以便在这样的环境中使用非X Window的预览实现。

所以,对于没有X window的场景,应使用Preview.DRM

from picamera2 import Picamera2, Preview
import time
picam2 = Picamera2()
camera_config = picam2.create_preview_configuration()
picam2.configure(camera_config)
picam2.start_preview(Preview.DRM)
picam2.start()
time.sleep(2)
picam2.capture_file("test.jpg")

picamera2的high-level API

上面的拍照过程,还可以更简单一点,就是使用picamera2的high-level API,其中封装了拍照的流程。

from picamera2 import Picamera2
picam2 = Picamera2()
picam2.start_and_capture_file("test.jpg")

这将会拍照一张全分辨率的图片。

picamera2的相关example

多张合成来降噪

#!/usr/bin/python3

"""Example comparing capturing a single photo vs capturing multiple photos and averaging to try to reduce noise"""

import time

import numpy as np
from PIL import Image

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
capture_config = picam2.create_still_configuration()
picam2.configure(capture_config)

picam2.start()
time.sleep(2)

with picam2.controls as ctrl:
    ctrl.AnalogueGain = 1.0
    ctrl.ExposureTime = 250000
time.sleep(2)

imgs = 3  # Capture 3 images to average
sumv = None
for i in range(imgs):
    if sumv is None:
        sumv = np.longdouble(picam2.capture_array())
        img = Image.fromarray(np.uint8(sumv))
        img.save("original.tif")
    else:
        sumv += np.longdouble(picam2.capture_array())

img = Image.fromarray(np.uint8(sumv / imgs))
img.save("averaged.tif")

同时拍摄raw格式和jpg格式的图片

#!/usr/bin/python3

# Capture a DNG and a JPEG made from the same raw data.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
capture_config = picam2.create_still_configuration(raw={})
picam2.configure(preview_config)

picam2.start()
time.sleep(2)

buffers, metadata = picam2.switch_mode_and_capture_buffers(capture_config, ["main", "raw"])
picam2.helpers.save(picam2.helpers.make_image(buffers[0], capture_config["main"]), metadata, "full.jpg")
picam2.helpers.save_dng(buffers[1], metadata, capture_config["raw"], "full.dng")

也可以这么写

#!/usr/bin/python3

# Capture a DNG and a JPEG made from the same raw data.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
capture_config = picam2.create_still_configuration(raw={}, display=None)
picam2.configure(preview_config)

picam2.start()
time.sleep(2)

r = picam2.switch_mode_capture_request_and_stop(capture_config)
r.save("main", "full.jpg")
r.save_dng("full.dng")

捕获jpg文件

#!/usr/bin/python3

# Capture a JPEG while still running in the preview mode. When you
# capture to a file, the return value is the metadata for that image.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()

preview_config = picam2.create_preview_configuration(main={"size": (800, 600)})
picam2.configure(preview_config)

picam2.start_preview(Preview.QTGL)

picam2.start()
time.sleep(2)

metadata = picam2.capture_file("test.jpg")
print(metadata)

picam2.close()

捕获png文件

#!/usr/bin/python3

# Capture a PNG while still running in the preview mode.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration(main={"size": (800, 600)})
picam2.configure(preview_config)

picam2.start()
time.sleep(2)

picam2.capture_file("test.png")

连续拍摄多张照片

#!/usr/bin/python3
import time

from picamera2 import Picamera2

picam2 = Picamera2()
picam2.configure("still")
picam2.start()

# Give time for Aec and Awb to settle, before disabling them
time.sleep(1)
picam2.set_controls({"AeEnable": False, "AwbEnable": False, "FrameRate": 1.0})
# And wait for those settings to take effect
time.sleep(1)

start_time = time.time()
for i in range(1, 51):
    r = picam2.capture_request()
    r.save("main", f"image{i}.jpg")
    r.release()
    print(f"Captured image {i} of 50 at {time.time() - start_time:.2f}s")


picam2.stop()

将图像数据捕获到buffer

#!/usr/bin/python3

import io
import time

from picamera2 import Picamera2

picam2 = Picamera2()
capture_config = picam2.create_still_configuration()
picam2.configure(picam2.create_preview_configuration())
picam2.start()

time.sleep(1)
data = io.BytesIO()
picam2.capture_file(data, format='jpeg')
print(data.getbuffer().nbytes)

time.sleep(1)
data = io.BytesIO()
picam2.switch_mode_and_capture_file(capture_config, data, format='jpeg')
print(data.getbuffer().nbytes)

对camera的拍摄参数进行修改

######### 写法1

#!/usr/bin/python3

# Example of setting controls. Here, after one second, we fix the AGC/AEC
# to the values it has reached whereafter it will no longer change.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)

picam2.start()
time.sleep(1)

metadata = picam2.capture_metadata()
print(f'metadata = {metadata}')
controls = {c: metadata[c] for c in ["ExposureTime", "AnalogueGain", "ColourGains"]}
print(controls)

picam2.set_controls(controls)
time.sleep(5)

########## 写法2

#!/usr/bin/python3

# Another (simpler!) way to fix the AEC/AGC and AWB.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)

picam2.start()
time.sleep(1)

picam2.set_controls({"AwbEnable": 0, "AeEnable": 0})
time.sleep(5)

######## 写法3
#!/usr/bin/python3

# Example of setting controls using the "direct" attribute method.

import time

from picamera2 import Picamera2, Preview
from picamera2.controls import Controls

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)

picam2.start()
time.sleep(1)

with picam2.controls as ctrl:
    ctrl.AnalogueGain = 6.0
    ctrl.ExposureTime = 60000

time.sleep(2)

ctrls = Controls(picam2)
ctrls.AnalogueGain = 1.0
ctrls.ExposureTime = 10000
picam2.set_controls(ctrls)

time.sleep(2)

捕获图片最简单的方式

#!/usr/bin/python3

from picamera2 import Picamera2

picam2 = Picamera2()

# Capture one image with the default configurations.
picam2.start_and_capture_file("test.jpg")

# Capture 3 images. Use a 0.5 second delay after the first image.
picam2.start_and_capture_files("test{:d}.jpg", num_files=3, delay=0.5)  # noqa

控制预览界面的参数

#!/usr/bin/python3

# Start camera with fixed exposure and gain.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
controls = {"ExposureTime": 10000, "AnalogueGain": 1.0}
preview_config = picam2.create_preview_configuration(controls=controls)
picam2.configure(preview_config)

picam2.start()
time.sleep(5)

FrameServer,多个线程从此拿数据

#!/usr/bin/python3

# These two are only needed for the demo code below the FrameServer class.
import time
from threading import Condition, Thread

from picamera2 import Picamera2


class FrameServer:
    def __init__(self, picam2, stream='main'):
        """A simple class that can serve up frames from one of the Picamera2's configured streams to multiple other threads.

        Pass in the Picamera2 object and the name of the stream for which you want
        to serve up frames.
        """
        self._picam2 = picam2
        self._stream = stream
        self._array = None
        self._condition = Condition()
        self._running = True
        self._count = 0
        self._thread = Thread(target=self._thread_func, daemon=True)

    @property
    def count(self):
        """A count of the number of frames received."""
        return self._count

    def start(self):
        """To start the FrameServer, you will also need to start the Picamera2 object."""
        self._thread.start()

    def stop(self):
        """To stop the FrameServer

        First stop any client threads (that might be
        blocked in wait_for_frame), then call this stop method. Don't stop the
        Picamera2 object until the FrameServer has been stopped.
        """
        self._running = False
        self._thread.join()

    def _thread_func(self):
        while self._running:
            array = self._picam2.capture_array(self._stream)
            self._count += 1
            with self._condition:
                self._array = array
                self._condition.notify_all()

    def wait_for_frame(self, previous=None):
        """You may optionally pass in the previous frame that you got last time you called this function.

        This will guarantee that you don't get duplicate frames
        returned in the event of spurious wake-ups, and it may even return more
        quickly in the case where a new frame has already arrived.
        """
        with self._condition:
            if previous is not None and self._array is not previous:
                return self._array
            while True:
                self._condition.wait()
                if self._array is not previous:
                    return self._array


# Below here is just demo code that uses the class:

def thread1_func():
    global thread1_count
    while not thread_abort:
        _ = server.wait_for_frame()
        thread1_count += 1


def thread2_func():
    global thread2_count
    frame = None
    while not thread_abort:
        frame = server.wait_for_frame(frame)
        thread2_count += 1


thread_abort = False
thread1_count = 0
thread2_count = 0
thread1 = Thread(target=thread1_func)
thread2 = Thread(target=thread2_func)

picam2 = Picamera2()
server = FrameServer(picam2)
thread1.start()
thread2.start()
server.start()
picam2.start()

time.sleep(5)

thread_abort = True
thread1.join()
thread2.join()
server.stop()
picam2.stop()

print("Thread1 received", thread1_count, "frames")
print("Thread2 received", thread2_count, "frames")
print("Server received", server.count, "frames")

基于opencv的人脸识别

我们在系统层面安装opencv。

sudo apt install -y python3-opencv
sudo apt install -y opencv-data
#!/usr/bin/python3

import cv2

from picamera2 import Picamera2

# Grab images as numpy arrays and leave everything else to OpenCV.

face_detector = cv2.CascadeClassifier("/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml")
cv2.startWindowThread()

picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))
picam2.start()

while True:
    im = picam2.capture_array()

    grey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
    faces = face_detector.detectMultiScale(grey, 1.1, 5)

    for (x, y, w, h) in faces:
        cv2.rectangle(im, (x, y), (x + w, y + h), (0, 255, 0))

    cv2.imshow("Camera", im)
    cv2.waitKey(1)

上面的代码性能略差,下面使用picam2.post_callback接口来实现更高性能的人脸框绘制。

#!/usr/bin/python3
import time

import cv2

from picamera2 import MappedArray, Picamera2, Preview

# This version creates a lores YUV stream, extracts the Y channel and runs the face
# detector directly on that. We use the supplied OpenGL accelerated preview window
# and delegate the face box drawing to its callback function, thereby running the
# preview at the full rate with face updates as and when they are ready.

face_detector = cv2.CascadeClassifier("/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml")


def draw_faces(request):
    with MappedArray(request, "main") as m:
        for f in faces:
            (x, y, w, h) = [c * n // d for c, n, d in zip(f, (w0, h0) * 2, (w1, h1) * 2)]
            cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0))


picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)
config = picam2.create_preview_configuration(main={"size": (640, 480)},
                                             lores={"size": (320, 240), "format": "YUV420"})
picam2.configure(config)

(w0, h0) = picam2.stream_configuration("main")["size"]
(w1, h1) = picam2.stream_configuration("lores")["size"]
s1 = picam2.stream_configuration("lores")["stride"]
faces = []
picam2.post_callback = draw_faces

picam2.start()

start_time = time.monotonic()
# Run for 10 seconds so that we can include this example in the test suite.
while time.monotonic() - start_time < 100:
    buffer = picam2.capture_buffer("lores")
    grey = buffer[:s1 * h1].reshape((h1, s1))
    faces = face_detector.detectMultiScale(grey, 1.1, 3)

通过长短帧融合实现hdr

#!/usr/bin/python3

import time

import cv2
import numpy as np

from picamera2 import Picamera2

# Simple Mertens merge with 3 exposures. No image alignment or anything fancy.
RATIO = 3.0


picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()

# Run for a second to get a reasonable "middle" exposure level.
time.sleep(1)


metadata = picam2.capture_metadata()
exposure_normal = metadata["ExposureTime"]
gain = metadata["AnalogueGain"] * metadata["DigitalGain"]
picam2.stop()
controls = {"ExposureTime": exposure_normal, "AnalogueGain": gain}
capture_config = picam2.create_preview_configuration(main={"size": (1920, 1080),
                                                            "format": "RGB888"},
                                                     controls=controls)
picam2.configure(capture_config)
picam2.start()
normal = picam2.capture_array()
picam2.stop()

st=time.time()
exposure_short = int(exposure_normal / RATIO)
picam2.set_controls({"ExposureTime": exposure_short, "AnalogueGain": gain})
picam2.start()
short = picam2.capture_array()
picam2.stop()
print(f'elapsed time: {time.time()-st}',flush=True)

exposure_long = int(exposure_normal * RATIO)
picam2.set_controls({"ExposureTime": exposure_long, "AnalogueGain": gain})
picam2.start()
long = picam2.capture_array()
picam2.stop()

merge = cv2.createMergeMertens()
merged = merge.process([short, normal, long])
merged = np.clip(merged * 255, 0, 255).astype(np.uint8)
cv2.imwrite("normal.jpg", normal)
cv2.imwrite("merged.jpg", merged)

通过对比merged.jpg和normal.jpg,可以看出来,merge后的图片,灯的高光压制更好,暗部更亮,对比度更高。通过直方图看,merge后的图片直方图整体右移,说明其亮度变得更高,且在暗部有一个突起,提高了整体的对比度。

raw图相关的api

#!/usr/bin/python3

# Configure a raw stream and capture an image from it.
import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration(raw={"size": picam2.sensor_resolution})
print(preview_config)
picam2.configure(preview_config)

picam2.start()
time.sleep(2)

raw = picam2.capture_array("raw")
print(raw.shape)
print(picam2.stream_configuration("raw"))

对预览图像做翻转

#!/usr/bin/python3

# Run the camera with a 180 degree rotation.
import time

import libcamera

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
preview_config["transform"] = libcamera.Transform(hflip=1, vflip=1)
picam2.configure(preview_config)

picam2.start()
time.sleep(5)

拍照相关的config

#!/usr/bin/python3

# Use the configuration structure method to do a full res capture.

import time

from picamera2 import Picamera2

picam2 = Picamera2()

# We don't really need to change anyhting, but let's mess around just as a test.
picam2.preview_configuration.size = (800, 600)
picam2.preview_configuration.format = "YUV420"
picam2.still_configuration.size = (1600, 1200)
picam2.still_configuration.enable_raw()
picam2.still_configuration.raw.size = picam2.sensor_resolution

picam2.start("preview", show_preview=True)
time.sleep(2)

picam2.switch_mode_and_capture_file("still", "test_full.jpg")

可以在预览窗口添加曝光值等数据

#!/usr/bin/python3

import time

from picamera2 import Picamera2

picam2 = Picamera2()
picam2.start(show_preview=True)
time.sleep(0.5)

# Or you could do this before starting the camera.
picam2.title_fields = ["ExposureTime", "AnalogueGain", "DigitalGain"]
time.sleep(2)

# And you can change it too.
picam2.title_fields = ["ColourTemperature", "ColourGains"]
time.sleep(2)

yuv2rgb

#!/usr/bin/python3

import cv2

from picamera2 import Picamera2

cv2.startWindowThread()

picam2 = Picamera2()
config = picam2.create_preview_configuration(lores={"size": (640, 480)})
picam2.configure(config)
picam2.start()

while True:
    yuv420 = picam2.capture_array("lores")
    rgb = cv2.cvtColor(yuv420, cv2.COLOR_YUV420p2RGB)
    cv2.imshow("Camera", rgb)

其中,lores表示低分辨率。

变焦控制

#!/usr/bin/python3

# How to do digital zoom using the "ScalerCrop" control.

import time

from picamera2 import Picamera2, Preview

picam2 = Picamera2()
picam2.start_preview(Preview.QTGL)

preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)

picam2.start()
time.sleep(2)

size = picam2.capture_metadata()['ScalerCrop'][2:]

full_res = picam2.camera_properties['PixelArraySize']

for _ in range(20):
    # This syncs us to the arrival of a new camera frame:
    picam2.capture_metadata()

    size = [int(s * 0.95) for s in size]
    offset = [(r - s) // 2 for r, s in zip(full_res, size)]
    picam2.set_controls({"ScalerCrop": offset + size})

time.sleep(2)

tensorflow 分割

首先需要安装tflite

source diy_camera/bin/activate
pip install tflite-runtime
#!/usr/bin/python3
## segmentation.py

# Usage: ./segmentation.py --model deeplapv3.tflite --label deeplab_labels.txt

import argparse
import select
import sys
import time

import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
from PIL import Image

from picamera2 import Picamera2, Preview

normalSize = (640, 480)
lowresSize = (320, 240)

masks = {}
captured = []
segmenter = None


def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret


def InferenceTensorFlow(image, model, colours, label=None):
    global masks

    if label:
        labels = ReadLabelFile(label)
    else:
        labels = None

    interpreter = tflite.Interpreter(model_path=model, num_threads=4)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    height = input_details[0]['shape'][1]
    width = input_details[0]['shape'][2]
    o_height = output_details[0]['shape'][1]
    o_width = output_details[0]['shape'][2]
    floating_model = False
    if input_details[0]['dtype'] == np.float32:
        floating_model = True

    rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

    picture = cv2.resize(rgb, (width, height))

    input_data = np.expand_dims(picture, axis=0)
    if floating_model:
        input_data = np.float32(input_data / 255)

    interpreter.set_tensor(input_details[0]['index'], input_data)

    interpreter.invoke()

    output = interpreter.get_tensor(output_details[0]['index'])[0]

    mask = np.argmax(output, axis=-1)
    found_indices = np.unique(mask)
    colours = np.loadtxt(colours)
    new_masks = {}
    for i in found_indices:
        if i == 0:
            continue
        output_shape = [o_width, o_height, 4]
        colour = [(0, 0, 0, 0), colours[i]]
        overlay = (mask == i).astype(np.uint8)
        overlay = np.array(colour)[overlay].reshape(
            output_shape).astype(np.uint8)
        overlay = cv2.resize(overlay, normalSize)
        if labels is not None:
            new_masks[labels[i]] = overlay
        else:
            new_masks[i] = overlay
    masks = new_masks
    print("Found", masks.keys())


def capture_image_and_masks(picam2: Picamera2, model, colour_file, label_file):
    global masks
    # Disable Aec and Awb so all images have the same exposure and colour gains
    picam2.set_controls({"AeEnable": False, "AwbEnable": False})
    time.sleep(1.0)
    request = picam2.capture_request()
    image = request.make_image("main")
    lores = request.make_buffer("lores")
    stride = picam2.stream_configuration("lores")["stride"]
    grey = lores[:stride * lowresSize[1]].reshape((lowresSize[1], stride))

    InferenceTensorFlow(grey, model, colour_file, label_file)
    for k, v in masks.items():
        comp = np.array([0, 0, 0, 0]).reshape(1, 1, 4)
        mask = (~((v == comp).all(axis=-1)) * 255).astype(np.uint8)
        label = k
        label = label.replace(" ", "_")
        if label in captured:
            label = f"{label}{sum(label in x for x in captured)}"
        cv2.imwrite(f"mask_{label}.png", mask)
        image.save(f"img_{label}.png")
        captured.append(label)
    print(masks.keys())


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model', help='Path of the segmentation model.', required=True)
    parser.add_argument('--label', help='Path of the labels file.')
    parser.add_argument('--colours', help='File path of the label colours.')
    parser.add_argument('--output', help='File path of the output image.')
    args = parser.parse_args()

    if args.output:
        output_file = args.output
    else:
        output_file = 'out.png'

    if args.label:
        label_file = args.label
    else:
        label_file = None

    if args.colours:
        colour_file = args.colours
    else:
        colour_file = "colours.txt"

    picam2 = Picamera2()
    picam2.start_preview(Preview.QTGL)
    config = picam2.create_preview_configuration(main={"size": normalSize},
                                                 lores={"size": lowresSize, "format": "YUV420"})
    picam2.configure(config)

    stride = picam2.stream_configuration("lores")["stride"]

    picam2.start()

    try:
        while True:
            buffer = picam2.capture_buffer("lores")
            grey = buffer[:stride * lowresSize[1]].reshape((lowresSize[1], stride))
            InferenceTensorFlow(grey, args.model, colour_file, label_file)
            overlay = np.zeros((normalSize[1], normalSize[0], 4), dtype=np.uint8)
            global masks
            for v in masks.values():
                overlay += v
            # Set Alphas and overlay
            overlay[:, :, -1][overlay[:, :, -1] == 255] = 150
            picam2.set_overlay(overlay)
            # Check if enter has been pressed
            i, o, e = select.select([sys.stdin], [], [], 0.1)
            if i:
                input()
                capture_image_and_masks(picam2, args.model, colour_file, label_file)
                picam2.stop()
                if input("Continue (y/n)?").lower() == "n":
                    raise KeyboardInterrupt
                picam2.start()
    except KeyboardInterrupt:
        print(f"Have captured {captured}")
        todo = input("What to composite?")
        bg = input("Which image to use as background (empty for none)?")
        todo = todo.split()
        images = []
        masks = []
        if bg:
            base_image = Image.open(f"img_{bg}.png")
        else:
            base_image = np.zeros((normalSize[1], normalSize[0], 3), dtype=np.uint8)
            base_image = Image.fromarray(base_image)
        for item in todo:
            images.append(Image.open(f"img_{item}.png"))
            masks.append(Image.open(f"mask_{item}.png"))
        for i in range(len(masks)):
            base_image = Image.composite(images[i], base_image, masks[i])
        base_image.save(output_file)


if __name__ == '__main__':
    main()

需要从https://github.com/raspberrypi/picamera2/tree/main/examples/tensorflow下载全部文件。

然后命令行执行

python segmentation.py --model deeplapv3.tflite --label deeplab_labels.txt

总体来说,效果差强人意,分割边缘不精细,可分割物体太少。

tflite 检测

#!/usr/bin/python3

# Copyright (c) 2022 Raspberry Pi Ltd
# Author: Alasdair Allan <alasdair@raspberrypi.com>
# SPDX-License-Identifier: BSD-3-Clause

# A TensorFlow Lite example for Picamera2 on Raspberry Pi OS Bullseye
#
# Install necessary dependences before starting,
#
# $ sudo apt update
# $ sudo apt install build-essential
# $ sudo apt install libatlas-base-dev
# $ sudo apt install python3-pip
# $ pip3 install tflite-runtime
# $ pip3 install opencv-python==4.4.0.46
# $ pip3 install pillow
# $ pip3 install numpy
#
# and run from the command line,
#
# $ python3 real_time_with_labels.py --model mobilenet_v2.tflite --label coco_labels.txt

import argparse

import cv2
import numpy as np
import tflite_runtime.interpreter as tflite

from picamera2 import MappedArray, Picamera2, Preview

normalSize = (640, 480)
lowresSize = (320, 240)

rectangles = []


def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret


def DrawRectangles(request):
    with MappedArray(request, "main") as m:
        for rect in rectangles:
            print(rect)
            rect_start = (int(rect[0] * 2) - 5, int(rect[1] * 2) - 5)
            rect_end = (int(rect[2] * 2) + 5, int(rect[3] * 2) + 5)
            cv2.rectangle(m.array, rect_start, rect_end, (0, 255, 0, 0))
            if len(rect) == 5:
                text = rect[4]
                font = cv2.FONT_HERSHEY_SIMPLEX
                cv2.putText(m.array, text, (int(rect[0] * 2) + 10, int(rect[1] * 2) + 10),
                            font, 1, (255, 255, 255), 2, cv2.LINE_AA)


def InferenceTensorFlow(image, model, output, label=None):
    global rectangles

    if label:
        labels = ReadLabelFile(label)
    else:
        labels = None

    interpreter = tflite.Interpreter(model_path=model, num_threads=4)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    height = input_details[0]['shape'][1]
    width = input_details[0]['shape'][2]
    floating_model = False
    if input_details[0]['dtype'] == np.float32:
        floating_model = True

    rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    initial_h, initial_w, channels = rgb.shape

    picture = cv2.resize(rgb, (width, height))

    input_data = np.expand_dims(picture, axis=0)
    if floating_model:
        input_data = (np.float32(input_data) - 127.5) / 127.5

    interpreter.set_tensor(input_details[0]['index'], input_data)

    interpreter.invoke()

    detected_boxes = interpreter.get_tensor(output_details[0]['index'])
    detected_classes = interpreter.get_tensor(output_details[1]['index'])
    detected_scores = interpreter.get_tensor(output_details[2]['index'])
    num_boxes = interpreter.get_tensor(output_details[3]['index'])

    rectangles = []
    for i in range(int(num_boxes)):
        top, left, bottom, right = detected_boxes[0][i]
        classId = int(detected_classes[0][i])
        score = detected_scores[0][i]
        if score > 0.5:
            xmin = left * initial_w
            ymin = bottom * initial_h
            xmax = right * initial_w
            ymax = top * initial_h
            box = [xmin, ymin, xmax, ymax]
            rectangles.append(box)
            if labels:
                print(labels[classId], 'score = ', score)
                rectangles[-1].append(labels[classId])
            else:
                print('score = ', score)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model', help='Path of the detection model.', required=True)
    parser.add_argument('--label', help='Path of the labels file.')
    parser.add_argument('--output', help='File path of the output image.')
    args = parser.parse_args()

    if (args.output):
        output_file = args.output
    else:
        output_file = 'out.jpg'

    if (args.label):
        label_file = args.label
    else:
        label_file = None

    picam2 = Picamera2()
    picam2.start_preview(Preview.QTGL)
    config = picam2.create_preview_configuration(main={"size": normalSize},
                                                 lores={"size": lowresSize, "format": "YUV420"})
    picam2.configure(config)

    stride = picam2.stream_configuration("lores")["stride"]
    picam2.post_callback = DrawRectangles

    picam2.start()

    while True:
        buffer = picam2.capture_buffer("lores")
        grey = buffer[:stride * lowresSize[1]].reshape((lowresSize[1], stride))
        _ = InferenceTensorFlow(grey, args.model, output_file, label_file)


if __name__ == '__main__':
    main()

总体来说,实时性还是挺好的,但是识别准确率差。