进阶篇:让摄像头动起来!ESP32+SG90打造智能云台系统


预计阅读时间:24 分钟

进阶篇:让摄像头动起来!ESP32+SG90打造智能云台系统

上一篇我们搞定了家庭监控系统,但固定视角还是不够灵活。今天我们更进一步,给摄像头加上"脖子",让它能上下左右转动,实现360度无死角监控!


🎯 项目升级亮点

3D打印云台:专业外观,安装方便
ESP32控制核心:WiFi连接,响应迅速
双舵机驱动:上下左右全方位转动
手机APP控制:指尖操控,随心所欲
低成本改造:总成本约40元


📦 准备工作:需要这些材料

硬件清单(约40元)

  1. ESP32开发板 💰 16元
  2. 推荐ESP32-WROOM-32或ESP32-DevKitC(我用的是esp32c3-supermini开发板)
  3. 内置WiFi和蓝牙,功能强大

  4. SG90舵机 x2 💰 10元

  5. 一个控制左右转动(水平方向)
  6. 一个控制上下转动(垂直方向)

  7. USB电源适配器 💰 5元

  8. 5V 2A输出,给舵机供电
  9. 可以用旧手机充电器

  10. 3D打印云台组件 💰 1-5元

  11. 或者用纸板手工制作
  12. 总重量不到100克

软件工具

  • VS Code:代码编辑器
  • ESP-IDF:ESP32官方开发框架
  • Arduino IDE(可选):如果习惯用Arduino开发
  • Trae:AI编码助手

💡 推荐工具:开发者的瑞士军刀

在开发过程中,经常需要各种转换、加密、编码等工具,推荐一个超好用的开发工具网站:liufly.top

这是一个免费的开发工具集,功能超级丰富:

🔐 加密与安全类:Token生成器、Hash文本(MD5/SHA1-SHA512等)、Bcrypt加密对比、UUID/ULID生成、RSA密钥对生成、BIP39助记词生成、OTP代码生成校验、IBAN验证解析

🔄 编码转换类:Base64字符串/文件编码解码、整数进制转换(十进制/十六进制/二进制等)、罗马数字转换、ASCII二进制/Unicode互转、URL编码解码、HTML实体转义

📝 格式转换类:YAML/JSON/TOML/XML两两互转、Markdown转HTML、JSON转CSV、Docker run转Docker Compose、Outlook Safelink解码

✏️ 文本处理类:大小写转换、文本转北约音标、字符串混淆、文本差异对比、ASCII艺术生成、Numeronym生成(如i18n)、Lorem Ipsum生成、文本统计(字符/字数)

🌐 网络与设备类:URL解析、HTTP状态码查询、IPv4子网计算/地址转换/范围扩展、MAC地址查询/生成、IPv6 ULA生成、设备信息获取、Basic Auth生成

🛠️ 开发辅助类:密码强度分析、PDF签名校验、JWT解析、Git速查表、Crontab生成、Regex测试/速查表、Chmod权限计算、User-Agent解析

🏠 生活与办公类:QR码/WiFi QR码生成、SVG占位图生成、摄像头录制、Emoji选择器、手机号解析格式化、邮箱标准化

🔢 计算与统计类:数学计算

工具多到数不完,全部免费使用,强烈推荐收藏!👉 立即访问 liufly.top


🔧 第一步:3D打印云台

获取模型

我们使用现成的3D模型,不用自己设计:

📁 模型地址https://www.thingiverse.com/thing:2892903

这个模型适配SG90舵机,包含: - 底座 - 云台支架 - 摄像头固定座

打印设置

  • 材料:PLA或PETG
  • 填充率:20%
  • 层高:0.2mm

组装说明

  1. 打印完成后,清理支撑和毛边
  2. 将水平舵机安装在底座上
  3. 将垂直舵机安装在支架上
  4. 最后把摄像头固定在座子上
  5. 确保所有部件转动顺畅,没有卡顿

没有3D打印机怎么办?

可以用硬纸板或亚克力板手工制作,原理相同,就是外观不那么精致。


🔧 第二步:ESP32硬件连接

接线图

ESP32                  SG90舵机1(水平)
GPIO9  ───────────────  PWM信号(橙色)
5V      ───────────────  VCC(红色)
GND     ───────────────  GND(棕色)

ESP32                  SG90舵机2(垂直)
GPIO10  ───────────────  PWM信号(橙色)
5V      ───────────────  VCC(红色)
GND     ───────────────  GND(棕色)

注意事项

⚠️ 电源说明: - ESP32可以用USB供电(5V) - 两个舵机需要独立的5V电源(USB电源适配器) - 所有设备的GND必须共地! - 不要用ESP32的3.3V给舵机供电,会烧坏芯片


🔧 第三步:ESP32软件开发

环境搭建

  1. 安装VS Code
  2. 安装ESP-IDF扩展
  3. 在VS Code扩展商店搜索"ESP-IDF"
  4. 点击安装,按照提示完成配置

编写代码

创建新项目,写入以下代码:

/* BSD Socket API Example

   This example code is in the Public Domain (or CC0 licensed, at your option.)

   Unless required by applicable law or agreed to in writing, this
   software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
   CONDITIONS OF ANY KIND, either express or implied.
*/
#include <string.h>
#include <sys/param.h>
#include <ctype.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_system.h"
#include "esp_wifi.h"
#include "esp_event.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "esp_netif.h"
#include "protocol_examples_common.h"
#include <driver/ledc.h>

#include "lwip/err.h"
#include "lwip/sockets.h"
#include "lwip/sys.h"
#include <lwip/netdb.h>
#include "driver/gpio.h"

#define GPIO_OUTPUT_IO_2    8
#define PORT                        CONFIG_EXAMPLE_PORT
#define KEEPALIVE_IDLE              CONFIG_EXAMPLE_KEEPALIVE_IDLE
#define KEEPALIVE_INTERVAL          CONFIG_EXAMPLE_KEEPALIVE_INTERVAL
#define KEEPALIVE_COUNT             CONFIG_EXAMPLE_KEEPALIVE_COUNT

#define SERVO_PIN           9          // 舵机信号引脚
#define SERVO_PIN_H         10         // 水平舵机信号引脚(可修改)
#define LEDC_TIMER          LEDC_TIMER_0
#define LEDC_MODE           LEDC_LOW_SPEED_MODE
#define LEDC_CHANNEL        LEDC_CHANNEL_0
#define LEDC_CHANNEL_H      LEDC_CHANNEL_1
#define LEDC_DUTY_RES       LEDC_TIMER_13_BIT // 分辨率 
#define LEDC_FREQ_HZ        50                 // 50 Hz
#define sg90_min_pulse_width_us           500// 10% 占空比
#define sg90_max_pulse_width_us           2500// 10% 占空比
static const char *TAG = "SG90";
// 设置舵机角度(0 ~ 180)
static void set_servo_angle_channel(int angle, int channel) {
    if (angle < 0) angle = 0;
    if (angle > 180) angle = 180;

    // 映射角度到脉宽(微秒):0°=500us, 180°=2500us
    int pulse_us = 500 + (2000 * angle / 180);  // 线性插值
    // uint16_t pulse_width_us=(sg90_min_pulse_width_us+(sg90_max_pulse_width_us-sg90_min_pulse_width_us)*(angle /180));

    // int pulse_us = 600 + (1800 * angle / 180);  // 1800 = 2400 - 600
    // 计算占空比:duty = (pulse_us / period_us) * (2^resolution)
    // period_us = 1e6 / freq = 20000 us
    // uint32_t duty = (uint32_t)((pulse_us * ((uint64_t)1 << LEDC_DUTY_RES)) / 20000);
    // uint32_t duty = pulse_us *LEDC_FREQ_HZ *((1<<13)-1)/ 1e6;
    uint32_t max_duty = ((uint32_t)1 << LEDC_DUTY_RES) - 1;
    uint32_t duty = (uint32_t)((pulse_us * (uint64_t)max_duty) / 20000);
    ESP_LOGI(TAG, "Angle: %d°, Pulse: %d us, Duty: %u, Channel: %d", angle, pulse_us, duty, channel);
    ledc_set_duty(LEDC_MODE, channel, duty);
    ledc_update_duty(LEDC_MODE, channel);
}
// 兼容函数:默认控制垂直舵机(原实现)
static void set_servo_angle(int angle) {
    set_servo_angle_channel(angle, LEDC_CHANNEL);
}
static void do_retransmit(const int sock)
{
    int len;
    char rx_buffer[128];

    do {
        len = recv(sock, rx_buffer, sizeof(rx_buffer) - 1, 0);
        if (len < 0) {
            ESP_LOGE(TAG, "Error occurred during receiving: errno %d", errno);
        } else if (len == 0) {
            ESP_LOGW(TAG, "Connection closed");
        } else {
            rx_buffer[len] = 0; // Null-terminate
            ESP_LOGI(TAG, "Received %d bytes: '%s'", len, rx_buffer);

            bool is_angle_command = false;

            // 尝试将接收到的内容解析为整数(角度)
            // 支持命令格式:"H<angle>" 控制水平舵机,"V<angle>" 控制垂直舵机,
            // 以及兼容旧的纯数字(视为垂直舵机角度)
            char *s = rx_buffer;
            // 跳过前导空白
            while (*s && isspace((unsigned char)*s)) s++;
            if (*s == 'H' || *s == 'h') {
                s++;
                char *endptr;
                long angle = strtol(s, &endptr, 10);
                if (endptr != s && (*endptr == '\0' || isspace((unsigned char)*endptr))) {
                    if (angle >= 0 && angle <= 180) {
                        ESP_LOGI(TAG, "Setting H servo to angle: %ld°", angle);
                        set_servo_angle_channel((int)angle, LEDC_CHANNEL_H);
                        is_angle_command = true;
                    }
                }
            } else if (*s == 'V' || *s == 'v') {
                s++;
                char *endptr;
                long angle = strtol(s, &endptr, 10);
                if (endptr != s && (*endptr == '\0' || isspace((unsigned char)*endptr))) {
                    if (angle >= 0 && angle <= 180) {
                        ESP_LOGI(TAG, "Setting V servo to angle: %ld°", angle);
                        set_servo_angle((int)angle);
                        is_angle_command = true;
                        gpio_set_level(GPIO_OUTPUT_IO_2, (angle > 0) ? 1 : 0);
                    }
                }
            } else {
                // 尝试按原来方式解析纯数字(垂直舵机)
                char *endptr;
                long angle = strtol(s, &endptr, 10);
                if (endptr != s && (*endptr == '\0' || isspace((unsigned char)*endptr))) {
                    if (angle >= 0 && angle <= 180) {
                        ESP_LOGI(TAG, "Setting servo to angle: %ld°", angle);
                        set_servo_angle((int)angle);
                        is_angle_command = true;
                        gpio_set_level(GPIO_OUTPUT_IO_2, (angle > 0) ? 1 : 0);
                    }
                }
            }
            // 如果不是有效角度指令,则回显原数据
            if (!is_angle_command) {
                ESP_LOGI(TAG, "Not a valid angle command, echoing back");
                int to_write = len;
                while (to_write > 0) {
                    int written = send(sock, rx_buffer + (len - to_write), to_write, 0);
                    if (written < 0) {
                        ESP_LOGE(TAG, "Error sending: errno %d", errno);
                        return;
                    }
                    to_write -= written;
                }
            }

        }
    } while (len > 0);
}

static void tcp_server_task(void *pvParameters)
{
    char addr_str[128];
    int addr_family = (int)pvParameters;
    int ip_protocol = 0;
    int keepAlive = 1;
    int keepIdle = KEEPALIVE_IDLE;
    int keepInterval = KEEPALIVE_INTERVAL;
    int keepCount = KEEPALIVE_COUNT;
    struct sockaddr_storage dest_addr;

#ifdef CONFIG_EXAMPLE_IPV4
    if (addr_family == AF_INET) {
        struct sockaddr_in *dest_addr_ip4 = (struct sockaddr_in *)&dest_addr;
        dest_addr_ip4->sin_addr.s_addr = htonl(INADDR_ANY);
        dest_addr_ip4->sin_family = AF_INET;
        dest_addr_ip4->sin_port = htons(PORT);
        ip_protocol = IPPROTO_IP;
    }
#endif
#ifdef CONFIG_EXAMPLE_IPV6
    if (addr_family == AF_INET6) {
        struct sockaddr_in6 *dest_addr_ip6 = (struct sockaddr_in6 *)&dest_addr;
        bzero(&dest_addr_ip6->sin6_addr.un, sizeof(dest_addr_ip6->sin6_addr.un));
        dest_addr_ip6->sin6_family = AF_INET6;
        dest_addr_ip6->sin6_port = htons(PORT);
        ip_protocol = IPPROTO_IPV6;
    }
#endif

    int listen_sock = socket(addr_family, SOCK_STREAM, ip_protocol);
    if (listen_sock < 0) {
        ESP_LOGE(TAG, "Unable to create socket: errno %d", errno);
        vTaskDelete(NULL);
        return;
    }
    int opt = 1;
    setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#if defined(CONFIG_EXAMPLE_IPV4) && defined(CONFIG_EXAMPLE_IPV6)
    // Note that by default IPV6 binds to both protocols, it is must be disabled
    // if both protocols used at the same time (used in CI)
    setsockopt(listen_sock, IPPROTO_IPV6, IPV6_V6ONLY, &opt, sizeof(opt));
#endif

    ESP_LOGI(TAG, "Socket created");

    int err = bind(listen_sock, (struct sockaddr *)&dest_addr, sizeof(dest_addr));
    if (err != 0) {
        ESP_LOGE(TAG, "Socket unable to bind: errno %d", errno);
        ESP_LOGE(TAG, "IPPROTO: %d", addr_family);
        goto CLEAN_UP;
    }
    ESP_LOGI(TAG, "Socket bound, port %d", PORT);

    err = listen(listen_sock, 1);
    if (err != 0) {
        ESP_LOGE(TAG, "Error occurred during listen: errno %d", errno);
        goto CLEAN_UP;
    }


    while (1) {

        ESP_LOGI(TAG, "Socket listening");

        struct sockaddr_storage source_addr; // Large enough for both IPv4 or IPv6
        socklen_t addr_len = sizeof(source_addr);
        int sock = accept(listen_sock, (struct sockaddr *)&source_addr, &addr_len);
        if (sock < 0) {
            ESP_LOGE(TAG, "Unable to accept connection: errno %d", errno);
            break;
        }

        // Set tcp keepalive option
        setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &keepAlive, sizeof(int));
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &keepIdle, sizeof(int));
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &keepInterval, sizeof(int));
        setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &keepCount, sizeof(int));
        // Convert ip address to string
#ifdef CONFIG_EXAMPLE_IPV4
        if (source_addr.ss_family == PF_INET) {
            inet_ntoa_r(((struct sockaddr_in *)&source_addr)->sin_addr, addr_str, sizeof(addr_str) - 1);
        }
#endif
#ifdef CONFIG_EXAMPLE_IPV6
        if (source_addr.ss_family == PF_INET6) {
            inet6_ntoa_r(((struct sockaddr_in6 *)&source_addr)->sin6_addr, addr_str, sizeof(addr_str) - 1);
        }
#endif
        ESP_LOGI(TAG, "Socket accepted ip address: %s", addr_str);

        do_retransmit(sock);

        shutdown(sock, 0);
        close(sock);
    }

CLEAN_UP:
    close(listen_sock);
    vTaskDelete(NULL);
}

// ----- 新增:Wi-Fi 断线重连处理 -----
static int s_wifi_retry_num = 0;
static const int s_wifi_max_retries = 100;
static esp_event_handler_instance_t s_wifi_event_handler_instance;
static esp_event_handler_instance_t s_ip_event_handler_instance;

static void wifi_event_handler(void* arg, esp_event_base_t event_base,
                               int32_t event_id, void* event_data)
{
    if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
        if (s_wifi_retry_num < s_wifi_max_retries) {
            esp_err_t err = esp_wifi_connect();
            if (err == ESP_OK) {
                s_wifi_retry_num++;
                ESP_LOGI(TAG, "WiFi disconnected — retrying connect (%d/%d)", s_wifi_retry_num, s_wifi_max_retries);
            } else {
                ESP_LOGE(TAG, "esp_wifi_connect failed: %s", esp_err_to_name(err));
            }
        } else {
            ESP_LOGW(TAG, "Reached max WiFi reconnect attempts (%d)", s_wifi_max_retries);
        }
    }
}

static void got_ip_event_handler(void* arg, esp_event_base_t event_base,
                                 int32_t event_id, void* event_data)
{
    if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
        ESP_LOGI(TAG, "Got IP address — reset WiFi retry counter");
        s_wifi_retry_num = 0;
    }
}

void app_main(void)
{


    ESP_ERROR_CHECK(nvs_flash_init());
    ESP_ERROR_CHECK(esp_netif_init());
    ESP_ERROR_CHECK(esp_event_loop_create_default());

    /* 注册自定义的 Wi-Fi / IP 事件处理器以支持断线重连逻辑 */
    ESP_ERROR_CHECK(esp_event_handler_instance_register(WIFI_EVENT,
                                                        WIFI_EVENT_STA_DISCONNECTED,
                                                        &wifi_event_handler,
                                                        NULL,
                                                        &s_wifi_event_handler_instance));
    ESP_ERROR_CHECK(esp_event_handler_instance_register(IP_EVENT,
                                                        IP_EVENT_STA_GOT_IP,
                                                        &got_ip_event_handler,
                                                        NULL,
                                                        &s_ip_event_handler_instance));

    /* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig.
     * Read "Establishing Wi-Fi or Ethernet Connection" section in
     * examples/protocols/README.md for more information about this function.
     */
    ESP_ERROR_CHECK(example_connect());
    // --- 新增:GPIO 初始化 ---
    gpio_reset_pin(GPIO_OUTPUT_IO_2);
    /* Set the GPIO as a push/pull output */
    gpio_set_direction(GPIO_OUTPUT_IO_2, GPIO_MODE_OUTPUT);
    ESP_LOGI(TAG, "GPIO2 is initialized as output");

    // 配置 LEDC 定时器
    ledc_timer_config_t timer_conf = {
        .speed_mode = LEDC_MODE,
        .timer_num  = LEDC_TIMER,
        .duty_resolution = LEDC_DUTY_RES,
        .freq_hz    = LEDC_FREQ_HZ,
        .clk_cfg    = LEDC_AUTO_CLK,
    };
    ESP_ERROR_CHECK(ledc_timer_config(&timer_conf));

    // 配置 LEDC 通道
    ledc_channel_config_t channel_conf = {
        .gpio_num   = SERVO_PIN,
        .speed_mode = LEDC_MODE,
        .channel    = LEDC_CHANNEL,
        .timer_sel  = LEDC_TIMER,
        .intr_type      = LEDC_INTR_DISABLE,
        .duty       = 0,
        .hpoint     = 0,
    };
    ESP_ERROR_CHECK(ledc_channel_config(&channel_conf));
    // 配置水平舵机通道
    ledc_channel_config_t channel_conf_h = {
        .gpio_num   = SERVO_PIN_H,
        .speed_mode = LEDC_MODE,
        .channel    = LEDC_CHANNEL_H,
        .timer_sel  = LEDC_TIMER,
        .intr_type  = LEDC_INTR_DISABLE,
        .duty       = 0,
        .hpoint     = 0,
    };
    ESP_ERROR_CHECK(ledc_channel_config(&channel_conf_h));
    // 初始位置:60°
    set_servo_angle(60);
    // 水平舵机初始为中位 90°
    set_servo_angle_channel(90, LEDC_CHANNEL_H);
    vTaskDelay(pdMS_TO_TICKS(1000));

#ifdef CONFIG_EXAMPLE_IPV4
    xTaskCreate(tcp_server_task, "tcp_server", 4096, (void*)AF_INET, 5, NULL);
#endif
#ifdef CONFIG_EXAMPLE_IPV6
    xTaskCreate(tcp_server_task, "tcp_server", 4096, (void*)AF_INET6, 5, NULL);
#endif
}

编译和烧录

在VS Code中: 代码copy进去之后,直接编译和烧录 别忘记在配置文件sdkconfig中设置正确的WiFi SSID和密码,新创建的项目,sdkconfig只有编译一次或sdk配置编辑器才会生成 alt text

烧录完成后,ESP32会自动连接WiFi并启动Web服务器。

注意,在监视器中查看IP地址,记录下ESP32的IP地址,后续需要在python代码中配置。

🔧 第四步:改造服务端python代码和手机APP

首先修改服务端python代码,添加接收来自APP的控制指令。 全量代码如下:

import cv2
import socket
import pickle
import struct
import threading
import time

import lz4.block
import zlib


def send_to_ptz(angle, is_vertical):
    """发送控制指令到云台"""
    PTZ_HOST = '192.168.31.114'
    PTZ_PORT = 3333

    try:
        # 创建socket连接
        ptz_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ptz_socket.settimeout(2)  # 设置超时时间
        ptz_socket.connect((PTZ_HOST, PTZ_PORT))

        # 构建消息格式: h<角度> 或 v<角度>
        if is_vertical:
            message = f"v{angle}"
        else:
            message = f"h{angle}"

        # 发送消息
        ptz_socket.sendall(message.encode('utf-8'))
        print(f"发送指令到云台: {message}")

        # 关闭连接
        ptz_socket.close()
        return True
    except Exception as e:
        print(f"发送指令到云台失败: {str(e)}")
        return False

import select

def receive_commands(client_socket, running):
    """接收并处理客户端指令的线程函数"""
    # 云台角度初始化
    vertical_angle = 60  # 默认垂直角度60度
    horizontal_angle = 90  # 默认水平角度90度
    step = 2  # 控制步长2度

    while running[0]:
        try:
            # 使用select模块检查socket是否有数据可读
            # 设置100ms的超时,避免无限阻塞
            ready_to_read, _, _ = select.select([client_socket], [], [], 0.1)

            if ready_to_read:
                # 有数据可读,接收指令
                command = client_socket.recv(1024).decode().strip()
                if not command:
                    # 客户端关闭连接
                    print("客户端关闭连接,退出指令接收线程")
                    running[0] = False
                    break

                logtime = time.asctime(time.localtime(time.time()))
                print(logtime, "收到客户端指令:", command)
                # 处理不同指令
                if command == "stop":
                    print("接收到停止指令,准备停止服务...")
                    running[0] = False
                elif command == "status":
                    # 发送状态信息
                    status_msg = f"Server is running. PTZ: Vertical={vertical_angle}°, Horizontal={horizontal_angle}°"
                    msg_size = struct.pack(">i", len(status_msg))
                    client_socket.sendall(msg_size)
                    client_socket.sendall(status_msg.encode())
                # 云台控制指令
                elif command == "up":
                    # 向上控制
                    vertical_angle = min(vertical_angle + step, 90)  # 最大90度
                    print(f"云台向上移动,当前垂直角度: {vertical_angle}°")
                    # 发送指令到云台
                    send_to_ptz(vertical_angle, True)
                elif command == "down":
                    # 向下控制
                    vertical_angle = max(vertical_angle - step, 55)  # 最小0度
                    print(f"云台向下移动,当前垂直角度: {vertical_angle}°")
                    # 发送指令到云台
                    send_to_ptz(vertical_angle, True)
                elif command == "left":
                    # 向左控制
                    horizontal_angle = max(horizontal_angle + step, 0)  # 最小0度
                    print(f"云台向左移动,当前水平角度: {horizontal_angle}°")
                    # 发送指令到云台
                    send_to_ptz(horizontal_angle, False)
                elif command == "right":
                    # 向右控制
                    horizontal_angle = min(horizontal_angle - step, 180)  # 最大180度
                    print(f"云台向右移动,当前水平角度: {horizontal_angle}°")
                    # 发送指令到云台
                    send_to_ptz(horizontal_angle, False)
                # 设置角度指令
                elif command.startswith("set_vertical="):
                    # 设置垂直角度
                    try:
                        angle = int(command.split("=")[1])
                        if 0 <= angle <= 90:
                            vertical_angle = angle
                            print(f"设置垂直角度为: {vertical_angle}°")
                            # 发送指令到云台
                            send_to_ptz(vertical_angle, True)
                        else:
                            print("垂直角度超出范围(0-90)°")
                    except ValueError:
                        print("垂直角度格式错误")
                elif command.startswith("set_horizontal="):
                    # 设置水平角度
                    try:
                        angle = int(command.split("=")[1])
                        if 0 <= angle <= 180:
                            horizontal_angle = angle
                            print(f"设置水平角度为: {horizontal_angle}°")
                            # 发送指令到云台
                            send_to_ptz(horizontal_angle, False)
                        else:
                            print("水平角度超出范围(0-180)°")
                    except ValueError:
                        print("水平角度格式错误")
                else:
                    print("未知指令:", command)
            else:
                # 无数据可读,短暂休眠后继续循环
                time.sleep(0.01)
        except OSError as e:
            # 处理操作系统错误
            if e.errno == 9:  # Bad file descriptor
                # socket已关闭,正常退出循环
                print("Socket已关闭,退出指令接收线程")
                break
            elif running[0]:  # 其他操作系统错误且线程仍在运行
                logtime = time.asctime(time.localtime(time.time()))
                print(logtime, "接收指令时出错:", str(e))
                print(f"错误编号: {e.errno}, 错误信息: {str(e)}")
                # 继续循环,而不是退出
                time.sleep(0.01)
        except Exception as e:
            if running[0]:  # 避免在正常关闭时打印错误
                logtime = time.asctime(time.localtime(time.time()))
                print(logtime, "接收指令时出错:", str(e))
                print(f"异常类型: {type(e).__name__}")
            # 继续循环,而不是退出
            time.sleep(0.01)

def handle_client(client_socket:socket):
    strip = client_socket.getpeername()
    logtime = time.asctime(time.localtime(time.time()))
    print(logtime, strip, "已连接,等待认证...")
    command = ""
    try:
        command = client_socket.recv(1024).decode()
    except Exception as e:
        print(logtime, "指令解析不正确", str(e))
        client_socket.close()
        return
    if command != "*******":
        print(command)
        print(logtime, "指令不正确", client_socket.getpeername())
        client_socket.close()
        return
    print(logtime, strip, "认证通过")

    # 创建运行标志(使用列表以便在不同线程中修改)
    running = [True]

    # 启动指令接收线程
    command_thread = threading.Thread(target=receive_commands, args=(client_socket, running))
    command_thread.daemon = True  # 设置为守护线程,主线程结束时自动退出
    command_thread.start()

    try:
        while running[0]:
            # 读取摄像头帧
            _, frame = camera.read()
            # image_format = frame.dtype
            #
            # # 打印图像颜色格式
            # print(image_format)

            # 添加水印
            watermark_text = time.asctime(time.localtime(time.time()))
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 1
            color = (0, 0, 255)  # 红色
            thickness = 2
            text_size, _ = cv2.getTextSize(watermark_text, font, font_scale, thickness)
            text_position = (10, frame.shape[0] - 10)  # 水印位置
            cv2.putText(frame, watermark_text, text_position, font, font_scale, color, thickness)

            # 获取图像宽度和高度
            height, width, _ = frame.shape
            print(f"图像形状: {frame.shape}")
            # 对帧进行序列化
            qdata = frame.tobytes()
            data = zlib.compress(qdata)
            data_size = struct.pack(">i", len(data))
            # 发送帧大小
            client_socket.sendall(data_size)
            # 发送帧数据
            client_socket.sendall(data)
            time.sleep(0.04)
            # 按下ESC键退出
            key = cv2.waitKey(1) & 0xFF
            if key == 27:
                running[0] = False
                break

    except Exception as e:
        logTime = time.asctime(time.localtime(time.time()))
        print(logTime, "与客户端通信时出现错误:", str(e))

    finally:
        logTime = time.asctime(time.localtime(time.time()))
        print(logTime, "新客户端已断开:", strip)
        # 关闭客户端连接
        client_socket.close()


# 创建一个TCP/IP套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置 SO_REUSEADDR 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', 9030))
server_socket.listen(5)

print("等待客户端连接...")

# 打开摄像头
# 打开摄像头
camera = None
for index in range(10):
    print(f"尝试打开摄像头 index: {index}")
    cap = cv2.VideoCapture(index)
    if cap.isOpened():
        camera = cap
        print(f"成功打开摄像头 index: {index}")
        break
    else:
        cap.release()

if camera is None:
    print("无法打开任何摄像头")
    exit(1)

# 设置摄像头的图像宽度和高度
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
camera.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)
camera.set(cv2.CAP_PROP_FORMAT, cv2.CV_8UC1)
# 设置摄像头的属性
# camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('R', 'G', 'B', '3'))

# 设置图像格式
# 1. JPEG格式
camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'))
# 2. PNG格式
# cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('P', 'N', 'G'))

try:
    while True:
        try:

            # 接受客户端连接
            client_socket, address = server_socket.accept()
            # 创建新的线程来处理客户端
            client_thread = threading.Thread(target=handle_client, args=(client_socket,))
            client_thread.start()
        except Exception as e:
            logtime = time.asctime(time.localtime(time.time()))
            print(logtime, "与客户端通信时出现错误:", str(e))
except KeyboardInterrupt:
    pass

finally:
    # # 关闭连接
    # client_socket.close()
    server_socket.close()
    camera.release()
    cv2.destroyAllWindows()

现在需要修改之前的安卓APP,添加云台控制功能。

使用AI快速开发

如果你不会安卓开发,继续用Trae这个AI工具:

  1. 打开之前的APP项目
  2. 向AI说明需求:

    "请在现有的视频监控APP中添加一个云台控制面板,包含四个方向按钮和两个滑动条,分别控制摄像头的上下左右转动。控制指令通过HTTP请求发送到后端服务的IP地址,后端代码如下:把python服务代码给到AI

  3. AI会自动生成界面和逻辑代码
  4. 测试功能,有问题反馈给AI修正

🔧 第五步:整体组装

物理组装

  1. 安装云台:将3D打印的云台安装在合适位置(底盘、桌面、墙上等),我搞了一个比较重的底盘,要不然容易摔倒。
  2. 安装摄像头:将USB摄像头固定在云台上。
  3. 连接ESP32:将ESP32与舵机连接好,我用的是杜邦线,注意引脚对应关系。
  4. 供电:ESP32和舵机用独立USB电源
  5. 布线整理:用扎带或胶带固定线缆,保持整洁(非必要,我没做)

✨ 效果展示

组装完成后,打开手机APP: - 📱 实时视频画面 - 🎮 四个方向控制按钮(上下左右) - 📊 两个滑动条精确控制角度 - 🔄 实时响应,延迟小于500ms

现在,你的摄像头不再是"死"的了!轻轻一点,就能看到任何角度的画面。


⚠️ 注意事项

舵机保护

⚠️ 避免卡死:舵机长时间堵转会发热,甚至烧毁 ⚠️ 角度限制:要调整舵机力臂的角度,确定不要超出对应范围,舵机一般支持0-180度角度。但是垂直角度我只用到60-90度,超出范围会导致舵机变形。 ⚠️ 电源稳定:使用稳定电源,电压不足会导致舵机抖动(这个是很容易踩得坑,一开始用可调电源供电,可能质量不好,舵机一直摆动不正确并且抖动)

ESP32注意事项

⚠️ 共地连接:所有设备的GND必须连接在一起 ⚠️ PWM精度:50Hz频率,13位分辨率 ⚠️ WiFi连接:确保信号稳定,避免掉线


🎯 成本总结

项目 价格
ESP32开发板 18元
SG90舵机x2 10元
USB电源适配器 5-10元
3D打印材料 1-5元
总计 约40元

💡 进阶玩法

等你熟悉后,还可以尝试:

  • 🤖 自动巡航:定时自动转动摄像头扫描
  • 🔔 运动追踪:结合AI视觉自动追踪移动物体
  • 🎯 预设位置:保存常用角度,一键切换
  • 🌐 远程控制:通过公网远程操控
  • 📹 云台录像:移动中同时录像

🎉 总结

通过这个进阶项目,我们把一个简单的摄像头升级成了智能监控系统:

✅ 360度无死角监控
✅ 手机远程控制
✅ 精确角度调节
✅ 低成本实现
✅ 高度可定制

加上之前的摄像头系统,你现在拥有了一个功能强大的家庭监控解决方案!

最重要的是——所有数据都在你手中,隐私绝对安全


本文由 liubei 原创,转载请注明出处。

📖相关推荐