PC使用小爱同学远程唤醒开机配置方法

ZhangJian 2026-05-02 n次浏览 Python 编辑

目前使用很长一段时间下来,确实很实用方便就把经验给大家分享一下

做了一个小网页点击一下发送远程开机命令,觉得比较麻烦,最近换了小米的手机就可以使用小爱的API来控制

使用场景?
  1. 不在家,远程开机后连接使用,用完关闭更省电。
  2. 在家准备使用电脑,提前语音唤醒小爱同学打开电脑,相比走到电脑面前按下开机键,更加方便。
前提条件?
  1. 最好需要有NAS,用来接收指令、发送唤醒数据包
  2. PC在bios中打开网卡唤醒功能

唤醒数据包 一共102字节 开头6个字节为 FF FF FF FF FF FF PC的MAC地址 例(CC-33-55-66-AA-21) 重复20次 使用UDP向局域网发送广播数据包,BROADCAST改为局域网段,以255结尾,端口号7。

#!/soft/venv/ddns/bin/python

import socket
import time
import struct
from loguru import logger


def wake_up(mac='CC-33-55-66-AA-21'):
    MAC = mac
    BROADCAST = "192.168.5.255"
    if len(MAC) != 17:
        raise ValueError("MAC address should be set as form 'XX-XX-XX-XX-XX-XX'")
    mac_address = MAC.replace("-", '')
    data = ''.join(['FFFFFFFFFFFF', mac_address * 20])  # 构造原始数据格式
    send_data = b''

    # 把原始数据转换为16进制字节数组,
    for i in range(0, len(data), 2):
        send_data = b''.join([send_data, struct.pack('B', int(data[i: i + 2], 16))])
    # print(send_data)

    # 通过socket广播出去,为避免失败,间隔广播三次
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.sendto(send_data, (BROADCAST, 7))
        time.sleep(0.5)
        sock.sendto(send_data, (BROADCAST, 7))
        time.sleep(0.5)
        sock.sendto(send_data, (BROADCAST, 7))
        logger.info("电脑开机命令执行成功")
    except Exception as e:
        logger.error(e)

if __name__ == "__main__":
    wake_up('70-70-FC-07-AB-34')

小爱同学,个人开发者注册比较麻烦,推荐使用第三方平台 巴法云bemfa.com,方便快捷。

创建一个MQTT设备,006开关设备,在米家app添加第三方设备找到巴法云添加设备,代码中监听设备的消息,收到消息发送开机数据包。

deepseek写了一版直接可以使用,修改密钥和设备id,修改要唤醒的网卡mac地址。

# -*- coding: utf-8 -*-
"""
巴法云 MQTT 
"""

import time
import signal
import sys
import json
from typing import Optional, Callable

from pc_wake_up import wake_up

from loguru import logger
import paho.mqtt.client as mqtt

# ========== 配置区域 ==========
MQTT_HOST = "bemfa.com"
MQTT_PORT = 9501
CLIENT_ID = ""   # 你的私钥
USERNAME = ""
PASSWORD = ""
USE_SSL = False
KEEPALIVE = 60
QOS = 1
ENABLE_STATE_REPORT = True          # 上报状态以同步云端 / 音箱



class BemfaSocketController:
    def __init__(self, host=MQTT_HOST, port=MQTT_PORT,
                 client_id=CLIENT_ID, topic=TOPIC,
                 username=USERNAME, password=PASSWORD,
                 use_ssl=USE_SSL, keepalive=KEEPALIVE,
                 qos=QOS, enable_state_report=ENABLE_STATE_REPORT):
        self.host = host
        self.port = port
        self.client_id = client_id
        self.topic = topic
        self.username = username
        self.password = password
        self.use_ssl = use_ssl
        self.keepalive = keepalive
        self.qos = qos
        self.enable_state_report = enable_state_report

        self.client: Optional[mqtt.Client] = None
        self.current_state: Optional[str] = None
        self.running = True
        self._reconnect_delay = 1
        self._max_reconnect_delay = 60
        self._on_state_change_callback: Optional[Callable] = None

        # 简易去重:记录最近处理过的消息内容+时间
        self._last_msg_hash = ""
        self._last_msg_time = 0

    def set_on_state_change_callback(self, callback: Callable[[str], None]):
        self._on_state_change_callback = callback

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            logger.info(f"✅ MQTT连接成功 ({self.host}:{self.port})")
            self._reconnect_delay = 1
            client.subscribe(self.topic, qos=self.qos)
            logger.info(f"? 已订阅主题: {self.topic}")
        else:
            logger.error(f"❌ 连接失败 rc={rc}")
            if rc == 4:
                logger.error("请检查私钥(CLIENT_ID)")

    def _on_message(self, client, userdata, msg):
        payload = msg.payload.decode('utf-8').strip()
        logger.info(f"? 收到消息: {payload}")

        # ---------- 简易去重(防止同一秒内重复处理相同消息)----------
        msg_hash = f"{msg.topic}|{payload}"
        now = time.time()
        if msg_hash == self._last_msg_hash and (now - self._last_msg_time) < 1.0:
            logger.debug("⏩ 忽略重复消息(1秒内相同内容)")
            return
        self._last_msg_hash = msg_hash
        self._last_msg_time = now

        # ---------- 处理 JSON 格式(带来源标识)----------
        if payload.startswith("{") and payload.endswith("}"):
            try:
                data = json.loads(payload)
                if data.get("source") == self.client_id:
                    logger.debug("? 忽略自己上报的状态消息")
                    return
                cmd = data.get("cmd")
                if cmd in ("on", "off"):
                    self._execute_command(cmd)
                else:
                    logger.warning(f"未知JSON指令: {cmd}")
                return
            except Exception:
                pass  # 不是合法 JSON,继续按纯文本处理

        # ---------- 处理纯文本 on / off(兼容小爱、天猫等第三方)----------
        if payload in ("on", "off"):
            self._execute_command(payload)
        else:
            logger.warning(f"未知指令: {payload}")

    def _execute_command(self, command: str):
        """执行开关指令 + (可选)上报状态"""
        logger.info(f"⚡ 执行插座{command}动作")
        self.current_state = command

        # 1. 调用用户硬件控制回调
        if self._on_state_change_callback:
            self._on_state_change_callback(command)

        # 2. 上报状态(用于云端 / 音箱同步)
        if self.enable_state_report:
            self._report_state_with_source(command)

    def _report_state_with_source(self, state: str):
        """
        上报状态(带上自己的 client_id 作为 source),
        这样自己收到后能识别并忽略,避免循环。
        """
        if not self.client:
            logger.error("客户端未连接,无法上报状态")
            return
        try:
            payload = json.dumps({
                "cmd": state,
                "source": self.client_id
            })
            result = self.client.publish(self.topic, payload, qos=self.qos, retain=True)
            if result.rc == mqtt.MQTT_ERR_SUCCESS:
                logger.debug(f"? 状态上报成功: {state}")
            else:
                logger.warning(f"状态上报失败, rc={result.rc}")
        except Exception as e:
            logger.error(f"状态上报异常: {e}")

    def _on_disconnect(self, client, userdata, rc):
        logger.warning(f"⚠️ MQTT断开连接 rc={rc}")
        if rc != 0 and self.running:
            self._reconnect()

    def _on_publish(self, client, userdata, mid):
        logger.debug(f"✅ 消息发布成功 mid={mid}")

    def _reconnect(self):
        while self.running:
            try:
                logger.info(f"? 重连中 (延迟{self._reconnect_delay}s)...")
                time.sleep(self._reconnect_delay)
                if self._connect():
                    break
                self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
            except Exception as e:
                logger.error(f"重连异常: {e}")
                time.sleep(self._reconnect_delay)

    def _connect(self) -> bool:
        try:
            self.client = mqtt.Client(client_id=self.client_id, clean_session=True)
            if self.username or self.password:
                self.client.username_pw_set(self.username, self.password)
            self.client.on_connect = self._on_connect
            self.client.on_message = self._on_message
            self.client.on_disconnect = self._on_disconnect
            self.client.on_publish = self._on_publish
            if self.use_ssl:
                self.client.tls_set(cert_reqs=mqtt.ssl.CERT_NONE)
                self.client.tls_insecure_set(True)
            self.client.connect(self.host, self.port, self.keepalive)
            return True
        except Exception as e:
            logger.error(f"连接失败: {e}")
            return False

    def start(self):
        if not self._connect():
            logger.error("初始连接失败,程序退出")
            return
        self.client.loop_forever()

    def start_non_blocking(self):
        if not self._connect():
            return False
        self.client.loop_start()
        return True

    def stop(self):
        self.running = False
        if self.client:
            self.client.loop_stop()
            self.client.disconnect()
        logger.info("客户端已停止")

    def publish_command(self, command: str, retain: bool = True):
        """外部手动发布指令(纯文本,不携带source,会被当作外部指令)"""
        if not self.client:
            logger.error("客户端未连接")
            return False
        result = self.client.publish(self.topic, command, qos=self.qos, retain=retain)
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            logger.info(f"? 发布指令: {command}")
            return True
        else:
            logger.error(f"发布失败, rc={result.rc}")
            return False

    def get_current_state(self) -> Optional[str]:
        return self.current_state


def signal_handler(sig, frame):
    logger.info("收到退出信号,正在清理...")
    if 'controller' in globals():
        controller.stop()
    sys.exit(0)


if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    def hardware_control(state: str):
        if state == "on":
            logger.info("接收到开机命令")
            wake_up('70-70-FC-07-AB-34')
            logger.info("已执行开机命令")
        # elif state == "off":
        #     pass
    controller = BemfaSocketController()
    controller.set_on_state_change_callback(hardware_control)

    try:
        controller.start()
    except KeyboardInterrupt:
        controller.stop()

统计
文章数目 :
总字数 :
建立时长 :