观前提醒:本期内容为ESP32的FreeRTOS、ESP32多核任务编程。

项目预告:一个无需计算机板卡,可以多机互动的,百元左右的物联ROS底盘车

ESP32多线程编程深入解析

在物联网(IoT)的项目中,多任务处理是一个常见的需求。ESP32,作为一款性能强大的微控制器,它内置了双核处理器以及支持多线程的FreeRTOS操作系统,使得同时处理多个任务成为可能。本文将从浅入深地解析ESP32的多线程编程方法和其特点。

一、ESP32S3多核功能简介

ESP32-S3 是 Espressif Systems 开发的一款低功耗微控制器,它是 ESP32 系列的一部分,专为通用低功耗设备、物联网 (IoT) 应用和智能家居等领域设计。

ESP32-S3 采用了双核 Xtensa® 32-bit LX7 微处理器,这意味着它有两个处理核心可以同时运行任务,提高了处理能力和多任务处理的效率。这两个核心可以独立运行,也可以协同工作,处理复杂的计算任务或多个并发操作。

在使用 ESP32-S3 进行编程时,可以指定特定的任务运行在哪个核心上。这可以通过使用 FreeRTOS(实时操作系统)来实现,FreeRTOS 是 ESP32-S3 常用的操作系统。

二、FreeRTOS简介

FreeRTOS是一个迷你的实时操作系统内核,为多任务处理提供了丰富的API。在ESP32上,FreeRTOS已经被高度集成和优化,可以充分利用ESP32的双核处理器。FreeRTOS的主要特点包括:

  • 任务管理:可以创建、删除、挂起和恢复多个任务。
  • 同步原语:包括互斥量、信号量、事件标志等,用于任务间的同步。
  • 内存管理:提供动态内存分配和释放。
  • 定时器:提供软件定时器,用于定时任务的执行。

简要介绍一下FreeRTOS在ESP32上的底层实现:

FreeRTOS在ESP32上的底层实现涉及到硬件抽象、中断管理、多核调度等方面

  • 硬件抽象层(HAL)

FreeRTOS为不同的硬件平台提供了硬件抽象层,这样FreeRTOS的核心代码可以在不同的硬件上运行而无需修改。在ESP32上,HAL包括对CPU、定时器、中断控制器等硬件资源的抽象。这些抽象使得FreeRTOS能够利用ESP32的硬件特性,如定时器用于调度器的时基,中断控制器用于管理中断。

  • 中断管理

ESP32的中断管理是通过它的中断控制器来实现的。FreeRTOS定义了一套中断服务例程(ISR)的编写规则,这些规则确保了ISR可以安全地与FreeRTOS的任务和内核对象交互。例如,FreeRTOS提供了portENTER_CRITICAL和portEXIT_CRITICAL宏来在ISR中保护临界区,防止任务切换时数据不一致。

  • 多核调度

ESP32的FreeRTOS利用了双核心的特性来实现多核调度。FreeRTOS的调度器被设计为可以在多核环境下运行,每个核可以运行自己的任务。FreeRTOS在ESP32上的实现使用了特殊的同步机制,如“spinlock”锁,来同步两个核上的任务状态。

  • 任务切换和上下文保存

任务切换是多任务操作系统的核心,它允许CPU在不同的任务之间切换执行。在ESP32上,FreeRTOS的任务切换涉及到保存和恢复任务的上下文,包括CPU寄存器等状态信息。FreeRTOS使用汇编语言编写的上下文切换函数portSAVE_CONTEXT和portRESTORE_CONTEXT来处理这些操作,确保任务切换时能够正确保存和恢复每个任务的状态。

三、ESP32多线程编程基础

基础多线程例程:

#include <Arduino.h>

// 定义任务句柄
TaskHandle_t Task1;
TaskHandle_t Task2;

// 定义任务函数
void Task1code( void * pvParameters ){
  Serial.print("Task1 running on core ");
  Serial.println(xPortGetCoreID());
  for(;;){
    Serial.println("This is Task1");
    vTaskDelay(1000 / portTICK_PERIOD_MS); // 延迟1秒
  }
}

void Task2code( void * pvParameters ){
  Serial.print("Task2 running on core ");
  Serial.println(xPortGetCoreID());
  for(;;){
    Serial.println("This is Task2");
    vTaskDelay(2000 / portTICK_PERIOD_MS); // 延迟2秒
  }
}

void setup() {
  Serial.begin(115200);
  
  // 创建任务
  xTaskCreatePinnedToCore(
    Task1code, /* 任务函数 */
    "Task1",   /* 任务名字 */
    10000,     /* 栈大小 */
    NULL,      /* 传递给任务函数的参数 */
    1,         /* 优先级 */
    &Task1,    /* 任务句柄 */
    0);        /* 核心编号 */

  xTaskCreatePinnedToCore(
    Task2code,
    "Task2",
    10000,
    NULL,
    1,
    &Task2,
    1);
}

void loop() {
  // 在这里不需要做任何事情,因为所有的工作都在任务中完成
}

在这个例程中,我们创建了两个任务(Task1和Task2)。每个任务都在不同的核心上运行(ESP32S3有两个核心)。每个任务都会打印一条消息,然后延迟一段时间。Task1每秒打印一次消息,而Task2每两秒打印一次消息。

可以看到Task1运行在core 0,Task2运行在core 1

简单分析一下以上的程序,我们可以看到,实现多线程的主要函数只有两个:

Task1code与xTaskCreatePinnedToCore

Task1code函数:

Task1code是任务函数,用于在FreeRTOS任务中执行。在这个例程中,它被用作任务1的主体。下面是对这个函数的详细分析:

void Task1code( void * pvParameters ){
  Serial.print("Task1 running on core ");
  Serial.println(xPortGetCoreID());
  for(;;){
    Serial.println("This is Task1");
    vTaskDelay(1000 / portTICK_PERIOD_MS); // 延迟1秒
  }
}
  • 函数定义:void Task1code( void * pvParameters )。这个函数没有返回值(void),并接受一个void *类型的参数。这个参数可以用来传递任何类型的数据给任务,但在这个例程中我们并没有使用它。
  • 打印任务信息:Serial.print("Task1 running on core "); 和 Serial.println(xPortGetCoreID());。这两行代码打印出一条消息,说明任务1正在哪个核心上运行。xPortGetCoreID()函数返回当前任务正在运行的核心编号。
  • 无限循环:for(;;){...}。这是一个无限循环,任务会在这个循环中一直运行,直到被删除或者ESP32S3重启。
  • 打印任务消息:Serial.println("This is Task1");。这行代码在每次循环中打印一条消息,说明这是任务1。
  • 延迟:vTaskDelay(1000 / portTICK_PERIOD_MS);。这行代码使任务延迟1秒(1000毫秒)。portTICK_PERIOD_MS是FreeRTOS中的一个常量,表示一个时钟滴答的毫秒数。vTaskDelay()函数使任务进入阻塞状态,直到指定的时钟滴答数过去。在这个例程中,我们使用vTaskDelay()函数来控制任务的执行频率。

xTaskCreatePinnedToCore函数:

xTaskCreatePinnedToCore是FreeRTOS库中的一个函数,用于在指定的核心上创建一个新的任务。在ESP32S3这样的多核处理器上,这个函数非常有用,它允许你控制每个任务在哪个核心上运行。

BaseType_t xTaskCreatePinnedToCore(
  TaskFunction_t pvTaskCode,
  const char * const pcName,
  const uint32_t usStackDepth,
  void * const pvParameters,
  UBaseType_t uxPriority,
  TaskHandle_t * const pxCreatedTask,
  const BaseType_t xCoreID
);
  • pvTaskCode:这是一个指向任务函数的指针。任务函数是任务的主体,它包含了任务需要执行的代码。
  • pcName:这是任务的名字,是一个以null结尾的字符串。任务的名字主要用于调试。
  • usStackDepth:这是任务栈的大小,以字为单位。任务栈用于存储局部变量和函数调用的返回地址。如果任务栈太小,可能会导致栈溢出;如果任务栈太大,可能会浪费内存。
  • pvParameters:这是一个指针,用于传递参数给任务函数。这个参数可以是任何类型的数据,但在任务函数中,它总是被视为void *类型。
  • uxPriority:这是任务的优先级。在FreeRTOS中,数字越大,优先级越高。当多个任务都处于就绪状态时,优先级最高的任务会被优先执行。
  • pxCreatedTask:这是一个指向任务句柄的指针。任务句柄是一个用于引用任务的“句柄”。你可以使用任务句柄来控制任务,例如删除任务或更改任务的优先级。
  • xCoreID:这是任务应该运行的核心的编号。在ESP32S3上,这个编号可以是0或1,分别代表两个核心。如果你想让FreeRTOS自动选择一个核心,可以将这个参数设置为tskNO_AFFINITY。

任务调度

在ESP32上,FreeRTOS支持抢升式调度和时间片调度。抢升式调度意味着任何时候只要有更高优先级的任务就绪,它就会立即抢占当前正在运行的任务。而时间片调度则允许具有相同优先级的任务公平地共享CPU时间。

同步机制

在多任务环境中,任务同步是另一个重要问题。ESP32中的FreeRTOS提供了多种任务同步机制,包括信号量、互斥量和事件组等。

  • 信号量(Semaphore):信号量是一种用于保护共享资源的同步机制。当一个任务需要访问共享资源时,它必须首先获得信号量。如果信号量已经被其他任务占用,那么这个任务就会被阻塞,直到信号量可用。
  • 互斥量(Mutex):互斥量是一种特殊的信号量,主要用于保护共享资源,防止同时访问。与信号量不同,互斥量有所有权的概念,只有占有互斥量的任务才能释放互斥量。
  • 事件组(Event Group):事件组是一种用于同步多个任务的机制。每个事件组包含一组事件位,任务可以等待一个或多个事件位被设置。当事件位被设置时,等待这些事件位的任务就会被唤醒。

四、ESP32多线程编程进阶

核心亲和性

ESP32是一款双核微控制器,这意味着它有两个CPU核心可以并行处理任务。然而,并不是所有的任务都适合在任何核心上运行。有些任务可能需要频繁地访问某些特定的硬件资源,而这些资源可能只能由特定的核心访问。这就引入了一个概念,叫做核心亲和性(Core Affinity)。

核心亲和性是指任务对运行在特定CPU核心上的偏好。在FreeRTOS中,你可以使用xTaskCreatePinnedToCore()函数来设置任务的核心亲和性。例如,以下的代码创建了一个任务,并将其固定在0号核心上运行:

void taskCode(void * parameter) {
  for (;;) {
    // 任务代码
  }
}

void setup() {
  xTaskCreatePinnedToCore(
    taskCode,          // 任务函数
    "TaskName",        // 任务名字
    10000,             // 栈大小
    NULL,              // 传递给任务函数的参数
    1,                 // 优先级
    NULL,              // 任务句柄
    0                  // CPU核心
  );
}

在这个例子中,任务taskCode将始终在0号CPU核心上运行,无论1号CPU核心是否空闲。这可以确保taskCode始终有足够的CPU时间来执行,而不会被其他在1号CPU核心上运行的任务干扰。

任务队列

在多任务环境中,任务通常需要通过某种方式来交换数据。FreeRTOS提供了一种叫做任务队列(Task Queue)的机制来实现这一点。

任务队列是一种先入先出(FIFO)的数据结构,任务可以将数据项发送到队列,也可以从队列接收数据项。当队列为空时,试图从队列接收数据的任务将被阻塞,直到有其他任务发送数据到队列。同样,当队列已满时,试图向队列发送数据的任务也将被阻塞,直到有其他任务从队列接收数据。

以下是一个使用任务队列的例子:

QueueHandle_t queue;

void senderTask(void * parameter) {
  int item = 0;
  for (;;) {
    xQueueSend(queue, &item, portMAX_DELAY);
    item++;
  }
}

void receiverTask(void * parameter) {
  int item;
  for (;;) {
    xQueueReceive(queue, &item, portMAX_DELAY);
    Serial.println(item);
  }
}

void setup() {
  queue = xQueueCreate(10, sizeof(int));
  xTaskCreate(senderTask, "Sender", 10000, NULL, 1, NULL);
  xTaskCreate(receiverTask, "Receiver", 10000, NULL, 1, NULL);
}

在这个例子中,我们创建了一个队列和两个任务。senderTask任务不断地向队列发送数据,而receiverTask任务则不断地从队列接收数据。当队列为空时,receiverTask任务将被阻塞,直到senderTask任务发送数据到队列。当队列已满时,senderTask任务将被阻塞,直到receiverTask任务从队列接收数据。

总的来说,任务队列是一种强大的工具,它可以帮助你在任务之间传递数据,同步任务的执行,以及管理共享资源。通过合理地使用任务队列,你可以构建出复杂的多任务应用。

任务通知

任务通知是一种轻量级、高效的通信机制,它可以用来唤醒一个或多个任务。每个任务都有一个关联的通知值,任务可以等待其通知值被设置,也可以修改其通知值。

任务通知的一个常见用途是在中断服务例程(ISR)中唤醒一个任务。例如,当一个外部中断触发时,ISR可以通过设置任务的通知值来唤醒该任务,然后该任务可以处理中断事件。

以下是一个使用任务通知的例子:

void taskCode(void * parameter) {
  uint32_t notificationValue;
  for (;;) {
    xTaskNotifyWait(0, 0, &notificationValue, portMAX_DELAY);
    // 处理通知
  }
}

void isr() {
  BaseType_t xHigherPriorityTaskWoken = pdFALSE;
  vTaskNotifyGiveFromISR(taskHandle, &xHigherPriorityTaskWoken);
  portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}

void setup() {
  xTaskCreate(taskCode, "TaskName", 10000, NULL, 1, &taskHandle);
  attachInterrupt(digitalPinToInterrupt(pin), isr, RISING);
}

在这个例子中,当引脚上的电平上升时,ISR会唤醒taskCode任务。taskCode任务在收到通知后可以处理中断事件。

互斥量

互斥量(Mutex)是一种同步机制,它可以用来保护共享资源,防止多个任务同时访问。当一个任务获得了互斥量,其他任务就不能获得该互斥量,直到该任务释放互斥量。

以下是一个使用互斥量的例子:

SemaphoreHandle_t mutex;

void taskCode(void * parameter) {
  for (;;) {
    xSemaphoreTake(mutex, portMAX_DELAY);
    // 访问共享资源
    xSemaphoreGive(mutex);
  }
}

void setup() {
  mutex = xSemaphoreCreateMutex();
  xTaskCreate(taskCode, "TaskName1", 10000, NULL, 1, NULL);
  xTaskCreate(taskCode, "TaskName2", 10000, NULL, 1, NULL);
}

在这个例子中,两个任务都试图访问同一个共享资源。通过使用互斥量,我们可以确保在任何时候只有一个任务可以访问该资源,从而防止数据竞争和其他并发问题。

信号量

信号量(Semaphore)是一种计数同步机制,它可以用来控制对一组共享资源的访问。信号量的值表示可用资源的数量。当一个任务获得了信号量,信号量的值就会减一;当一个任务释放了信号量,信号量的值就会加一。

以下是一个使用信号量的例子:

SemaphoreHandle_t semaphore;

void taskCode(void * parameter) {
  for (;;) {
    xSemaphoreTake(semaphore, portMAX_DELAY);
    // 访问共享资源
    xSemaphoreGive(semaphore);
  }
}

void setup() {
  semaphore = xSemaphoreCreateCounting(3, 3);
  xTaskCreate(taskCode, "TaskName1", 10000, NULL, 1, NULL);
  xTaskCreate(taskCode, "TaskName2", 10000, NULL, 1, NULL);
  xTaskCreate(taskCode, "TaskName3", 10000, NULL, 1, NULL);
}

在这个例子中,我们有三个任务和三个共享资源。通过使用信号量,我们可以确保在任何时候最多只有三个任务可以访问共享资源。

五、实战代码

这是一个物联网花盆的代码,它运行在ESP32S3上,可以上传传感器信息至服务器,并从服务器接受控制信息

#include <Arduino.h>

#include <WiFi.h>
#include <PubSubClient.h>

#include "DHT.h"

#include <Wire.h>
#include <Adafruit_Sensor.h>
#include <Adafruit_BMP280.h>

#include <BH1750.h>

const char *ssid = "809";
const char *password = "809809809";
const char *mqttServer = "192.168.3.222";
const int mqttPort = 1883;

#define DHTPIN 4      // Digital pin connected to the DHT sensor
#define DHTTYPE DHT11 // DHT 11
DHT dht(DHTPIN, DHTTYPE);

#define soil 10

#define STBY 38
#define AIN1 36
#define AIN2 39

#define sun_pin 15

#define SCL_PIN 8
#define SDA_PIN 9

#define Sub "js"
#define Pub "cgq"

const int flowPin = 47;           
volatile int flowPulseCount;  
const float pulsesPerLitre = 4834.0; 
Adafruit_BMP280 bmp;                

int sensorValue; 
long sum = 0;    
int vout = 0;  
int uv = 0;    

// PWM
int freq = 5000;    
int channel = 10;  
int resolution = 8;

float Humidity; // = dht.readHumidity();
// Humidity= dht.readHumidity();
float Temperature; // = dht.readTemperature(); 
// Temperature= dht.readTemperature();
double soil_moisture; 
// soil_moisture= analogRead(soil);
uint16_t lux; //= lightMeter.readLightLevel();
// uint16_t lux = lightMeter.readLightLevel();
float BMP_Temperature; //= bmp.readTemperature()
// BMP_Temperature = bmp.readTemperature()
float BMP_Pressure; // bmp.readPressure()
// BMP_Pressure = bmp.readPressure()
float BMP_Approxaltitude; //=bmp.readAltitude(1013.25)
// BMP_Approxaltitude = bmp.readAltitude(1013.25)
float S_vout; //= vout;
// S_vout = vout;

WiFiClient espClient;
PubSubClient client(espClient);

void pulseCounter()
{
  flowPulseCount++;
}

int readuv()
{
  sensorValue = 0;
  sum = 0;
  for (int i = 0; i < 1024; i++) 
  {
    sensorValue = analogRead(15);
    sum = sensorValue + sum;
    delay(2);
  }
  vout = sum >> 10; 
  vout = vout * 4980.0 / 1024;

  if (vout < 50)
  { 
    uv = 0;
  }
  else if (vout < 227)
  {
    uv = 1;
  }
  else if (vout < 318)
  {
    uv = 2;
  }
  else if (vout < 408)
  {
    uv = 3;
  }
  else if (vout < 503)
  {
    uv = 4;
  }
  else if (vout < 606)
  {
    uv = 5;
  }
  else if (vout < 696)
  {
    uv = 6;
  }
  else if (vout < 795)
  {
    uv = 7;
  }
  else if (vout < 881)
  {
    uv = 8;
  }
  else if (vout < 976)
  {
    uv = 9;
  }
  else if (vout < 1079)
  {
    uv = 10;
  }
  else
  {
    uv = 11;
  }
  delay(20);
  return uv;
}

void callback(char *topic, byte *payload, unsigned int length)
{
  Serial.println("Entered callback function"); 
  int message;
  for (int i = 0; i < length; i++)
  {
    message = payload[0]-48;
    
  }
  Serial.println(message);
  if (message)
  {
  digitalWrite(37, HIGH); 
  Serial.println("high");
  }
  if (!message)
  {
  digitalWrite(37, LOW); 
   Serial.println("low");
  }
 
  
}

void mqttSubscriberTask(void *pvParameters)
{
  (void)pvParameters;
  Serial.println("Entered callback function111");
  for (;;)
  {
    client.loop();
    // ledcWrite(LEDC_CHANNEL_1, motorSpeed);
    vTaskDelay(10 / portTICK_PERIOD_MS); // wait for 10 ms
  }
}

void mqttPublisherTask(void *pvParameters)
{
  (void)pvParameters;
  pinMode(flowPin, INPUT_PULLUP);                                    
  attachInterrupt(digitalPinToInterrupt(flowPin), pulseCounter, RISING);
  for (;;)
  {
    Humidity = dht.readHumidity();
    Temperature = dht.readTemperature();
    // Pressure= bmp280.readPressure()/ 100.0F;
    soil_moisture = analogRead(soil);
    BMP_Temperature = bmp.readTemperature();
    BMP_Pressure = bmp.readPressure();
    BMP_Approxaltitude = bmp.readAltitude(1013.25);
    S_vout = vout;

    String sensorData = "{"; 
    sensorData += "\"temperature\":";
    sensorData += String(Temperature);
    sensorData += ",\"humidity\":";
    sensorData += String(Humidity);
    sensorData += ",\"soil_moisture\":";
    sensorData += String(soil_moisture);
    sensorData += ",\"BMP_Temperature\":";
    sensorData += String(BMP_Temperature);
    sensorData += ",\" BMP_Pressure\":";
    sensorData += String(BMP_Pressure);
    sensorData += ",\" BMP_Approxaltitude\":";
    sensorData += String(BMP_Approxaltitude);
    sensorData += ",\" The Photocurrent value : \":";
    sensorData += String(S_vout) + "mV";
    sensorData += ",\" UV Index =  \":";    
    sensorData += String(readuv());
    sensorData += ",\" flowPulseCount =  \":";
    sensorData += String(flowPulseCount);
    sensorData += "}";

    client.publish(Pub, sensorData.c_str());
    Serial.println(sensorData);

    vTaskDelay(500 / portTICK_PERIOD_MS); // wait for 5000 ms
  }
}

void setup()
{
  Serial.begin(115200);

  WiFi.begin(ssid, password);

  ledcSetup(channel, freq, resolution); 
  ledcAttachPin(AIN1, channel);        

  Wire.begin(SDA_PIN, SCL_PIN); // I2C��ʼ
  if (!bmp.begin(0x76))
  {
    Serial.println(F("Could not find a valid BMP280 sensor, check wiring!"));
  }

  /* Default settings from datasheet. */
  bmp.setSampling(Adafruit_BMP280::MODE_NORMAL,     /* Operating Mode. */
                  Adafruit_BMP280::SAMPLING_X2,     /* Temp. oversampling */
                  Adafruit_BMP280::SAMPLING_X16,    /* Pressure oversampling */
                  Adafruit_BMP280::FILTER_X16,      /* Filtering. */
                  Adafruit_BMP280::STANDBY_MS_500); /* Standby time. */

  while (WiFi.status() != WL_CONNECTED)
  {
    delay(1000);
    Serial.println("Connecting to WiFi...");
  }
  Serial.println("Connected to the WiFi network");

  client.setServer(mqttServer, mqttPort);
  while (!client.connected())
  {
    Serial.println("Connecting to MQTT...");

    if (client.connect("ESP32Client"))
    {
      Serial.println("connected");
      client.subscribe(Sub);
    }
    else
    {
      Serial.print("failed with state ");
      Serial.print(client.state());
      delay(2000);
    }
  }

  // pinMode(35, OUTPUT);
  pinMode(11, OUTPUT);
  pinMode(12, OUTPUT);
  pinMode(18, OUTPUT);
  pinMode(17, OUTPUT);
  pinMode(37, OUTPUT);
  pinMode(AIN1, OUTPUT);
  pinMode(STBY, OUTPUT);
  pinMode(AIN2, OUTPUT);
  digitalWrite(STBY, HIGH);
  digitalWrite(AIN2, LOW);

  pinMode(48, OUTPUT);
  pinMode(45, OUTPUT);
  digitalWrite(45, HIGH);
  digitalWrite(48, LOW);

  analogReadResolution(12);

  dht.begin(); 
  client.setCallback(callback);


  digitalWrite(12, LOW);
  digitalWrite(11, HIGH);

  digitalWrite(18, LOW);
  digitalWrite(17, HIGH);

  xTaskCreatePinnedToCore(
      mqttSubscriberTask,   /* Task function. */
      "mqttSubscriberTask", /* Name of task. */
      10000,                /* Stack size in words. */
      NULL,                 /* Parameter passed as input of the task */
      1,                    /* Priority of the task. */
      NULL,
      0); /* Task handle. */

  xTaskCreatePinnedToCore(
      mqttPublisherTask,   /* Task function. */
      "mqttPublisherTask", /* Name of task. */
      10000,               /* Stack size in words. */
      NULL,                /* Parameter passed as input of the task */
      1,                   /* Priority of the task. */
      NULL,
      1); /* Task handle. */
}

void loop()
{

}

运行程序可以看到ESP32发送的cgq话题信息