目前使用很长一段时间下来,确实很实用方便就把经验给大家分享一下
做了一个小网页点击一下发送远程开机命令,觉得比较麻烦,最近换了小米的手机就可以使用小爱的API来控制
使用场景?
- 不在家,远程开机后连接使用,用完关闭更省电。
- 在家准备使用电脑,提前语音唤醒小爱同学打开电脑,相比走到电脑面前按下开机键,更加方便。
前提条件?
- 最好需要有NAS,用来接收指令、发送唤醒数据包
- PC在bios中打开网卡唤醒功能
唤醒数据包 一共102字节 开头6个字节为 FF FF FF FF FF FF PC的MAC地址 例(CC-33-55-66-AA-21) 重复20次 使用UDP向局域网发送广播数据包,BROADCAST改为局域网段,以255结尾,端口号7。
#!/soft/venv/ddns/bin/python
import socket
import time
import struct
from loguru import logger
def wake_up(mac='CC-33-55-66-AA-21'):
MAC = mac
BROADCAST = "192.168.5.255"
if len(MAC) != 17:
raise ValueError("MAC address should be set as form 'XX-XX-XX-XX-XX-XX'")
mac_address = MAC.replace("-", '')
data = ''.join(['FFFFFFFFFFFF', mac_address * 20]) # 构造原始数据格式
send_data = b''
# 把原始数据转换为16进制字节数组,
for i in range(0, len(data), 2):
send_data = b''.join([send_data, struct.pack('B', int(data[i: i + 2], 16))])
# print(send_data)
# 通过socket广播出去,为避免失败,间隔广播三次
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(send_data, (BROADCAST, 7))
time.sleep(0.5)
sock.sendto(send_data, (BROADCAST, 7))
time.sleep(0.5)
sock.sendto(send_data, (BROADCAST, 7))
logger.info("电脑开机命令执行成功")
except Exception as e:
logger.error(e)
if __name__ == "__main__":
wake_up('70-70-FC-07-AB-34')
小爱同学,个人开发者注册比较麻烦,推荐使用第三方平台 巴法云bemfa.com,方便快捷。
创建一个MQTT设备,006开关设备,在米家app添加第三方设备找到巴法云添加设备,代码中监听设备的消息,收到消息发送开机数据包。


deepseek写了一版直接可以使用,修改密钥和设备id,修改要唤醒的网卡mac地址。
# -*- coding: utf-8 -*-
"""
巴法云 MQTT
"""
import time
import signal
import sys
import json
from typing import Optional, Callable
from pc_wake_up import wake_up
from loguru import logger
import paho.mqtt.client as mqtt
# ========== 配置区域 ==========
MQTT_HOST = "bemfa.com"
MQTT_PORT = 9501
CLIENT_ID = "" # 你的私钥
USERNAME = ""
PASSWORD = ""
USE_SSL = False
KEEPALIVE = 60
QOS = 1
ENABLE_STATE_REPORT = True # 上报状态以同步云端 / 音箱
class BemfaSocketController:
def __init__(self, host=MQTT_HOST, port=MQTT_PORT,
client_id=CLIENT_ID, topic=TOPIC,
username=USERNAME, password=PASSWORD,
use_ssl=USE_SSL, keepalive=KEEPALIVE,
qos=QOS, enable_state_report=ENABLE_STATE_REPORT):
self.host = host
self.port = port
self.client_id = client_id
self.topic = topic
self.username = username
self.password = password
self.use_ssl = use_ssl
self.keepalive = keepalive
self.qos = qos
self.enable_state_report = enable_state_report
self.client: Optional[mqtt.Client] = None
self.current_state: Optional[str] = None
self.running = True
self._reconnect_delay = 1
self._max_reconnect_delay = 60
self._on_state_change_callback: Optional[Callable] = None
# 简易去重:记录最近处理过的消息内容+时间
self._last_msg_hash = ""
self._last_msg_time = 0
def set_on_state_change_callback(self, callback: Callable[[str], None]):
self._on_state_change_callback = callback
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info(f"✅ MQTT连接成功 ({self.host}:{self.port})")
self._reconnect_delay = 1
client.subscribe(self.topic, qos=self.qos)
logger.info(f"? 已订阅主题: {self.topic}")
else:
logger.error(f"❌ 连接失败 rc={rc}")
if rc == 4:
logger.error("请检查私钥(CLIENT_ID)")
def _on_message(self, client, userdata, msg):
payload = msg.payload.decode('utf-8').strip()
logger.info(f"? 收到消息: {payload}")
# ---------- 简易去重(防止同一秒内重复处理相同消息)----------
msg_hash = f"{msg.topic}|{payload}"
now = time.time()
if msg_hash == self._last_msg_hash and (now - self._last_msg_time) < 1.0:
logger.debug("⏩ 忽略重复消息(1秒内相同内容)")
return
self._last_msg_hash = msg_hash
self._last_msg_time = now
# ---------- 处理 JSON 格式(带来源标识)----------
if payload.startswith("{") and payload.endswith("}"):
try:
data = json.loads(payload)
if data.get("source") == self.client_id:
logger.debug("? 忽略自己上报的状态消息")
return
cmd = data.get("cmd")
if cmd in ("on", "off"):
self._execute_command(cmd)
else:
logger.warning(f"未知JSON指令: {cmd}")
return
except Exception:
pass # 不是合法 JSON,继续按纯文本处理
# ---------- 处理纯文本 on / off(兼容小爱、天猫等第三方)----------
if payload in ("on", "off"):
self._execute_command(payload)
else:
logger.warning(f"未知指令: {payload}")
def _execute_command(self, command: str):
"""执行开关指令 + (可选)上报状态"""
logger.info(f"⚡ 执行插座{command}动作")
self.current_state = command
# 1. 调用用户硬件控制回调
if self._on_state_change_callback:
self._on_state_change_callback(command)
# 2. 上报状态(用于云端 / 音箱同步)
if self.enable_state_report:
self._report_state_with_source(command)
def _report_state_with_source(self, state: str):
"""
上报状态(带上自己的 client_id 作为 source),
这样自己收到后能识别并忽略,避免循环。
"""
if not self.client:
logger.error("客户端未连接,无法上报状态")
return
try:
payload = json.dumps({
"cmd": state,
"source": self.client_id
})
result = self.client.publish(self.topic, payload, qos=self.qos, retain=True)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"? 状态上报成功: {state}")
else:
logger.warning(f"状态上报失败, rc={result.rc}")
except Exception as e:
logger.error(f"状态上报异常: {e}")
def _on_disconnect(self, client, userdata, rc):
logger.warning(f"⚠️ MQTT断开连接 rc={rc}")
if rc != 0 and self.running:
self._reconnect()
def _on_publish(self, client, userdata, mid):
logger.debug(f"✅ 消息发布成功 mid={mid}")
def _reconnect(self):
while self.running:
try:
logger.info(f"? 重连中 (延迟{self._reconnect_delay}s)...")
time.sleep(self._reconnect_delay)
if self._connect():
break
self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
except Exception as e:
logger.error(f"重连异常: {e}")
time.sleep(self._reconnect_delay)
def _connect(self) -> bool:
try:
self.client = mqtt.Client(client_id=self.client_id, clean_session=True)
if self.username or self.password:
self.client.username_pw_set(self.username, self.password)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
if self.use_ssl:
self.client.tls_set(cert_reqs=mqtt.ssl.CERT_NONE)
self.client.tls_insecure_set(True)
self.client.connect(self.host, self.port, self.keepalive)
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
def start(self):
if not self._connect():
logger.error("初始连接失败,程序退出")
return
self.client.loop_forever()
def start_non_blocking(self):
if not self._connect():
return False
self.client.loop_start()
return True
def stop(self):
self.running = False
if self.client:
self.client.loop_stop()
self.client.disconnect()
logger.info("客户端已停止")
def publish_command(self, command: str, retain: bool = True):
"""外部手动发布指令(纯文本,不携带source,会被当作外部指令)"""
if not self.client:
logger.error("客户端未连接")
return False
result = self.client.publish(self.topic, command, qos=self.qos, retain=retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"? 发布指令: {command}")
return True
else:
logger.error(f"发布失败, rc={result.rc}")
return False
def get_current_state(self) -> Optional[str]:
return self.current_state
def signal_handler(sig, frame):
logger.info("收到退出信号,正在清理...")
if 'controller' in globals():
controller.stop()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def hardware_control(state: str):
if state == "on":
logger.info("接收到开机命令")
wake_up('70-70-FC-07-AB-34')
logger.info("已执行开机命令")
# elif state == "off":
# pass
controller = BemfaSocketController()
controller.set_on_state_change_callback(hardware_control)
try:
controller.start()
except KeyboardInterrupt:
controller.stop()
