抖音快手直播礼物获取后续实用介绍之硬件关联

查看 85|回复 10
作者:zhengduimen   



这种直播大家是否都看到过?
那么具体这东西 怎么做的?
不考虑别的 看一会直播就清楚了。。
直播画面上写的很清楚
送 xx  礼物 推到某一列的砖块
具体怎么推的
大家可以看下 礼物 前边有个 柱状体 上边 摆了 几个 东西   
这东西是啥?
电磁铁  淘宝  搜 推拉式电磁铁 就是这玩意  
然后 柱状体内部 按常理 还有个设备  一般用的 是锁控板 之类的东西 或者 干脆做成控制器
然后 电脑上 通过  检测收到的 礼物 触发 控制器 推送 电磁铁
然后 就
砰 ……  
砖块就倒了
而 这里 其实 最关键的 就两步
获取礼物信息  
触发控制器执行
简单两件事 就可以创业 了。  嗨皮不嗨皮 ?
那么下面我们再来说下 礼物怎么获取 ?
抖音 接口文档上 从 2025年3月吧 取消了 直播间 接口的 申请 。以后大概率也不会有了。但是开通了一个玩法功能,这个东西 我不大会搞。
但是 手机上  网页上 PC 端 大家都能看到送礼物信息吧。。想想办法 一样能获取到。但是 注意 别破解人家的接口,那玩意费时费力,而且人家稍微改动 一点 你这边跟着就废了老鼻子劲儿 得跟着更新了。
除了 还有方案 例如 无障碍  、 例如 dom 获取 等等 比较多
上述描述的是 礼物信息的获取
下面 我们主要来说下 怎么触发 控制器 控制 电磁铁
这里边就用到了简单的物联网相关的东西
首先 程序肯定要有
程序里具体写什么 ?
串口 这是必须的 无论你用的是 java  kotlin  python  go 等等 这些 任何语言 只要你关键到上述直播间的 硬件设备 ,必然离不开串口控制
那么串口里 需要啥?
需要啥?需要啥?……
解释下
串口地址 :ttys xxxxxx   例如 ttys0   ttys1  或者在win上 COM1、 COM2  等等 具体 那个串口地址能用 那就得看实际的 控制器 也就是内部的 锁控板 具体设定了。
波特率 :115200  19200  xxxxx  等等 这个当然也是锁控板 定好的。
只要上述这俩 你获取 拿到 并正确连接 ?
那是不是就OK了?
当然是了。
只不过还需要更进一步 。
那就是怎么触发?
这里就涉及到串口指令了。
每一个 锁控板 它都有自己固定的 触发指令
本来想 加一张锁控板的 图 结果 不知道为啥 提示添加成功 但是图 上不来 ,我能怎么办?我也很无奈。
---
发帖前最后尝试了一次 图传上来了



大家可以看到 上方 有8个口 这些其实 不重要 因为控制板 有各种规则  单口 多口 常规的 一般 是 6 路  、12 路、 24 路、 这些
这些口 每一个 从上方直播画面来说 就对应着 一个一个具体的礼物
例如 人气票 对应 口 1 、墨镜 对应 口 2 这些 自己配置设定好 就ok
-----
放弃了 接着说
而触发 锁控板执行 进而 触发电磁铁 执行的 关键 就在这条指令  例如 我的指令  人气票礼物触发:5501A10200
这条指令并不完善还要经过 异或校验 之后 才能成为完整 指令
当然这些都是 嵌入式工程师 预先写入的 如果有文档 大家根据文档 来就ok 不用过多纠结 。
总之 当你 能收到 礼物  ,连上 硬件,那么剩下的事情就只剩下业务逻辑处理了
而这些最不是事情 。
对广大坛友来说
应该是 so easy
最后 程序呢 我也写了 发到了原创区
但是 ,由于 额  由于 部分原因吧 。跟社区规则不是很符合
理由  原创区不得发布多种方式限制的程序   然后 很不行 被  风之大大 给 k 掉了 。。。。。
所以 换了种 方式 我仅仅介绍 下 这种直播方式的 实现流程
按理来说 这次 应该不会违规了。
说了这么多 代码 我也贴上一部分。
仅有pc 部分功能
以免违规
以下 为程序 主窗口类  并不能运行 只是介绍了下 内部 有那些控件 简单执行了那些逻辑
大家可以作为参考 简单 看下 或许 你会有更好的想法
liveMain.py  
[Python] 纯文本查看 复制代码import sys
import asyncio
from datetime import datetime
import re
from PyQt5.QtGui import QTextCursor, QImage, QTextImageFormat,QPixmap
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QMetaObject, pyqtSlot, QObject,QTimer
from PyQt5.QtWidgets import (QApplication, QWidget, QLabel, QPushButton,
                            QCheckBox,
                            QLineEdit,
                            QComboBox,
                            QTextEdit, QVBoxLayout, QHBoxLayout, QMessageBox)
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest
from WebSocketManager import WebSocketManager
import logging
from logging.handlers import TimedRotatingFileHandler
import time
import os, json,tempfile,requests
import subprocess
import platform
from datetime import datetime, timedelta
import serial.tools.list_ports  # 获取串口列表
from io import BytesIO
from Updater import Updater
import uuid
from LockSettingsWidget import LockSettingsWidget   # 顶部
from SerialManager import serial_manager
from HttpServer import HttpServer
from tts_manager import TTSManager
from app_state import AppState
# 获取当前程序目录
if getattr(sys, 'frozen', False):
    # 打包后的可执行文件运行
    base_dir = os.path.dirname(sys.executable)
else:
    # 开发环境中运行
    base_dir = os.path.dirname(__file__)
   
# 获取串口列表 start
def get_available_ports():
    """获取所有可用的串口列表"""
    ports = []
    try:
        for port in serial.tools.list_ports.comports():
            ports.append(f"{port.device} - {port.description}")
    except Exception as e:
        logger.error(f"liveMain 获取串口列表失败: {str(e)}")
    return ports
# 获取串口列表 end
# 激活码存取 相关 start
# ACTIVATE_FILE = os.path.join(os.path.dirname(__file__), "activated.json")   # 就在程序同级目录
# ACTIVATE_FILE = os.path.join(os.path.dirname(sys.executable), "activated.json")   # 就在程序同级目录
# if getattr(sys, 'frozen', False):
#     # 打包后的可执行文件运行
#     ACTIVATE_FILE = os.path.join(os.path.dirname(sys.executable), "activated.json")   # 就在程序同级目录
# else:
#     # 开发环境中运行
#     ACTIVATE_FILE = os.path.join(os.path.dirname(__file__), "activated.json")
   
ACTIVATE_PATH = os.path.join(base_dir, "data")
os.makedirs(ACTIVATE_PATH, exist_ok=True)  # 如果不存在就创建
ACTIVATE_FILE = os.path.join(ACTIVATE_PATH, "actionlog.json")
def save_activate_flag(code: str):
    """把激活码和有效期写进本地文件"""
    with open(ACTIVATE_FILE, "w", encoding="utf-8") as f:
        json.dump(
            {
                "activated": True,
                "act_code": code,            # 激活码
                "expire": (datetime.today() + timedelta(days=3650)).strftime("%Y-%m-%d")        # 有效期字符串,如 "2025-12-31"
            },
            f,
            ensure_ascii=False,
            indent=2
        )
def load_activate_flag():
    """返回 (activated, act_code, expire) 三元组;文件不存在/异常则返回 False,None,None"""
    if not os.path.exists(ACTIVATE_FILE):
        return False, None, None
    try:
        with open(ACTIVATE_FILE, "r", encoding="utf-8") as f:
            data = json.load(f)
        activated = data.get("activated", False)
        act_code = data.get("act_code")
        expire = data.get("expire")
        # 简单校验有效期
        if activated and expire and datetime.strptime(expire, "%Y-%m-%d") >= datetime.today():
            return True, act_code, expire
    except Exception:
        pass
    return False, None, None
# 激活码存取 相关 end

# 日志相关 start
# 创建 logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# # 文件名模板
# # log_file =  os.path.join(os.path.dirname(__file__), "actionlog.txt")
# # 文件名模板
# log_file =  os.path.join(os.path.dirname(sys.executable), "actionlog.txt")
# if getattr(sys, 'frozen', False):
#     # 打包后的可执行文件运行
#     log_file = os.path.join(os.path.dirname(sys.executable), "actionlog.txt")
# else:
#     # 开发环境中运行
#     log_file = os.path.join(os.path.dirname(__file__), "actionlog.txt")
# 确保 log 文件夹存在
log_dir = os.path.join(base_dir, "log")
os.makedirs(log_dir, exist_ok=True)  # 如果不存在就创建
# 最终日志文件路径
log_file = os.path.join(log_dir, "actionlog.txt")
# 使用 TimedRotatingFileHandler,每 10 分钟切割一次
handler = TimedRotatingFileHandler(
    filename=log_file,
    when="M",      # 按分钟切割
    interval=10,   # 每 10 分钟
    backupCount=0, # 保留旧日志文件数量,0 表示无限
    encoding="utf-8",
    utc=False
)
# 文件名时间戳格式,例如:actionlog.txt -> actionlog.txt.2025-08-21_10-00
handler.suffix = "%Y-%m-%d_%H-%M"
# 设置日志格式
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info(f"日志初始化:{log_file}")
# 日志相关 end
# 获取设备唯一编码 start
def get_motherboard_serial():
    """获取设备唯一编码(支持 Windows / macOS / Linux)"""
    system = platform.system()
    try:
        if system == "Windows":
            def run_ps(command: str) -> str:
                try:
                    result = subprocess.check_output(
                        ["powershell", "-NoProfile", "-Command", command],
                        stderr=subprocess.STDOUT
                    )
                    return 'live-'+result.decode("utf-8", errors="ignore").strip()
                except Exception:
                    return ""
            # 1. 主板序列号
            serial = run_ps("(Get-CimInstance Win32_BaseBoard).SerialNumber")
            if serial:
                return serial
            # 2. BIOS 序列号
            serial = run_ps("(Get-CimInstance Win32_BIOS).SerialNumber")
            if serial:
                return serial
            # 3. CPU ID
            serial = run_ps("(Get-CimInstance Win32_Processor).ProcessorId")
            if serial:
                return serial
            # 4. 兜底:持久 GUID
            path = os.path.join(os.getenv("APPDATA") or ".", "device_guid.txt")
            if os.path.exists(path):
                with open(path, "r") as f:
                    guid = f.read().strip()
            else:
                guid = str(uuid.uuid4())
                with open(path, "w") as f:
                    f.write(guid)
            return f"GUID:{guid}"
        elif system == "Darwin":  # macOS
            serial = None
            # 1. system_profiler(优先)
            try:
                result = subprocess.check_output(
                    ["system_profiler", "SPHardwareDataType"],
                    encoding="utf-8", errors="ignore"
                )
                for line in result.splitlines():
                    if "Serial Number" in line:
                        serial = line.split(":")[-1].strip()
                        break
            except Exception:
                pass
            # 2. ioreg(备用)
            if not serial:
                try:
                    result = subprocess.check_output(
                        ["ioreg", "-rd1", "-c", "IOPlatformExpertDevice"],
                        encoding="utf-8", errors="ignore"
                    )
                    for line in result.splitlines():
                        if "IOPlatformSerialNumber" in line:
                            serial = line.split("=")[-1].strip().strip('"')
                            break
                except Exception:
                    pass
            # 3. 兜底(uuid)
            if not serial:
                serial = str(uuid.getnode())
            return 'live-'+serial
        elif system == "Linux":
            try:
                with open("/sys/devices/virtual/dmi/id/board_serial", "r") as f:
                    serial = f.read().strip()
                return 'live-'+serial if serial else str(uuid.getnode())
            except Exception:
                return str(uuid.getnode())
        else:
            return "Unsupported OS"
    except Exception as e:
        return f"Error: {str(e)}"
# 获取设备唯一编码 end
# 异步线程类
class AsyncThread(QThread):
    error_occurred = pyqtSignal(str)
    service_stopped = pyqtSignal()
    service_started = pyqtSignal()
    thread_ready = pyqtSignal()
    def __init__(self, ws_manager):
        super().__init__()
        self.ws_manager = ws_manager
        self.loop = None
        self.task_queue = None
        self.running = False
        self.pending_task = None
        
        self.filter_flags = {
            "礼物": False,
            "点赞": False,
            "点亮": False,
            "弹幕": False
        }
        self.live_pts = {
            "抖音": False,
            "快手": False,
        }
    async def _process_tasks(self):
        while self.running:
            try:
                task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
                try:
                    await task
                    if hasattr(task, '__name__'):
                        if task.__name__ == "start":
                            self.service_started.emit()
                        elif task.__name__ == "stop":
                            self.service_stopped.emit()
                except Exception as e:
                    self.error_occurred.emit(f"任务执行错误: {str(e)}")
                finally:
                    self.task_queue.task_done()
            except asyncio.TimeoutError:
                continue
    def run(self):
        self.running = True
        try:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            self.task_queue = asyncio.Queue()
            
            # 线程就绪信号
            self.thread_ready.emit()
            
            if self.pending_task:
                self.loop.call_soon_threadsafe(
                    lambda: asyncio.ensure_future(self.task_queue.put(self.pending_task), loop=self.loop)
                )
                self.pending_task = None
            
            self.loop.run_until_complete(self._process_tasks())
        except Exception as e:
            self.error_occurred.emit(f"事件循环错误: {str(e)}")
        finally:
            if self.loop:
                self.loop.close()
            self.task_queue = None
            self.running = False
    def add_task(self, coroutine):
        if self.running and self.loop and self.task_queue:
            asyncio.run_coroutine_threadsafe(self.task_queue.put(coroutine), self.loop)
        elif self.running:
            self.pending_task = coroutine
        else:
            self.error_occurred.emit("无法添加任务,线程未运行")
            if asyncio.iscoroutine(coroutine):
                coroutine.close()
    def stop(self):
        if self.running:
            self.running = False
            if self.isRunning():
                self.wait()
# 主窗口类
class MyWindow(QWidget):
    # 新增一个信号用于在主线程中转发消息
    forward_message = pyqtSignal(str)
   
    def __init__(self):
        super().__init__()
        self.ws_manager = WebSocketManager()
        self.async_thread = None
        self.ws_running = False
        self.deviceId = get_motherboard_serial()
        # 关键修复:连接转发信号
        self.forward_message.connect(self.append_message)
        
        # 确保WebSocketManager的信号正确连接到转发信号
        self.ws_manager.message_received.connect(self.on_message_received)
        self.ws_manager.gift_list_received.connect(self.on_gift_list_received)
        self.ws_manager.chat_gift_received.connect(self.on_chat_gift_received)
        self.code = None
        self.initUI()
        
        self.updater = Updater(self)
        
        self.http_server = HttpServer()
        
   
    def check_update_version(self):
      
        has_update, latest, url, log = self.updater.check_update()
        if not has_update:
            QMessageBox.warning(self,"提示", "已是最新版")   
            return
        ok = QMessageBox.question(
            self, "发现新版本",
            f"检测到新版本 {latest}\n更新内容:\n{log}\n是否立即更新?",
            QMessageBox.Yes | QMessageBox.No
        )
        if ok != QMessageBox.Yes:
            return
        # 1. 把参数写 JSON
        cfg = {
            "download_url": url,
            "target_dir": os.path.dirname(sys.executable),
            "exe_name": "liveMain.exe"
        }
        # 写 JSON
        cfg_path = os.path.join(tempfile.gettempdir(), "liveMain_update.json")
        with open(cfg_path, "w", encoding="utf-8") as f:
            json.dump(cfg, f)
        # 直接启动同目录下的 updater.exe
        from PyQt5.QtCore import QProcess
        updater_exe = os.path.join(os.path.dirname(sys.executable), "updater.exe")
        QProcess.startDetached(updater_exe)
        QApplication.quit()
    def initUI(self):
        self.setWindowTitle("直播互动控制器")
        self.resize(800, 600)
         # 添加串口选择区域
        self.serial_label = QLabel("串口选择:")
        self.serial_combo = QComboBox()
        self.refresh_serial_btn = QPushButton("刷新")
        self.connect_serial_btn = QPushButton("连接")
        self.btn_lock_cfg = QPushButton("设置")
        
         # 串口布局
        serial_layout = QHBoxLayout()
        serial_layout.addWidget(self.serial_label)
        serial_layout.addWidget(self.serial_combo)
        serial_layout.addWidget(self.refresh_serial_btn)
        serial_layout.addWidget(self.connect_serial_btn)
        serial_layout.addWidget(self.btn_lock_cfg)
        
        # WebSocket地址显示
        self.address_label = QLabel(f"服务地址: ws://{self.ws_manager.ip}:{self.ws_manager.port}")
        self.address_label.setStyleSheet("font-size:14px; color:#2c3e50; font-weight:bold;")
        self.deviceId_lable = QLabel(f"设备码:{self.deviceId}")
        # self.action_code = QLineEdit(f"请输入设备激活码")
        self.action_code = QLineEdit()
        self.action_code.setPlaceholderText("请输入设备激活码")
        self.action_code.setStyleSheet("QTextEdit{min-height: 25px; max-height: 25px;}")
        self.activate_btn = QPushButton("激活")
        self.activate_btn.setCheckable(True)    # 支持 checked/unchecked
        self.activate_btn.setFixedWidth(60)     # 宽度固定
        
        
        # 2. 水平布局:标签-弹性空间-按钮
        addr_layout = QHBoxLayout()
        addr_layout.addWidget(self.address_label)
        addr_layout.addWidget(self.deviceId_lable)
        addr_layout.addWidget(self.action_code)
        addr_layout.addStretch()                # 把按钮推到最右
        addr_layout.addWidget(self.activate_btn)
        
        
        # 消息显示区域
        self.msg_label = QLabel("消息记录:")
        self.show_gift  = QCheckBox("礼物")
        self.show_gift.setChecked(True)
        self.show_msg  = QCheckBox("聊天")
        self.show_msg.setChecked(True)
        self.msg_label_voice = QLabel("语音播报:")
        self.show_gift_voice  = QCheckBox("礼物")
        self.show_msg_voice  = QCheckBox("聊天")
        filter_msg_gif = QHBoxLayout()
        filter_msg_gif.addWidget(self.msg_label)
        filter_msg_gif.addWidget(self.show_gift)
        filter_msg_gif.addWidget(self.show_msg)
        filter_msg_gif.addWidget(self.msg_label_voice)
        filter_msg_gif.addWidget(self.show_gift_voice)
        filter_msg_gif.addWidget(self.show_msg_voice)
        filter_msg_gif.addStretch()                # 把按钮推到最右
        
        self.msg_display = QTextEdit()
        self.msg_display.setReadOnly(True)
        self.msg_display.setAcceptRichText(True)  # 显式启用富文本
        self.msg_display.setTextInteractionFlags(
            Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard
        )
        self.msg_display.setStyleSheet("background:#f8f9fa; font-family:Consolas; font-size:12px; padding:10px;")
        self.msg_display.setMinimumHeight(350)
        # 功能按钮
        self.start_btn = QPushButton("启动服务")
        self.stop_btn = QPushButton("停止服务")
        self.start_btn.setStyleSheet("padding:10px 20px; font-size:14px;")
        self.stop_btn.setStyleSheet("padding:10px 20px; font-size:14px;")
        self.stop_btn.setEnabled(False)
        # ---------- 过滤复选框 ----------
        # self.chk_gift  = QCheckBox("礼物")
        # self.chk_gift.setChecked(True)
        # self.ws_manager.filter_flags["礼物"] = True
        # self.chk_like  = QCheckBox("点赞")
        # self.chk_light = QCheckBox("点亮")
        # self.chk_danmu = QCheckBox("弹幕")
        
        # for chk in (self.chk_gift, self.chk_like, self.chk_light, self.chk_danmu):
        #     chk.toggled.connect(self.on_filter_changed)
        
        # filter_layout = QHBoxLayout()
        # filter_layout.addWidget(QLabel("过滤条件:"))
        # filter_layout.addWidget(self.chk_gift)
        # filter_layout.addWidget(self.chk_like)
        # filter_layout.addWidget(self.chk_light)
        # filter_layout.addWidget(self.chk_danmu)
        # filter_layout.addStretch()
        
        
        # 直播平台
        self.chk_ks  = QCheckBox("快手")
        self.chk_ks.setChecked(True)
        self.ws_manager.live_pts["快手"] = True
        self.chk_dy  = QCheckBox("抖音")
        self.chk_dy.setChecked(True)
        self.ws_manager.live_pts["抖音"] = True
        
        
        self.version_label = QLabel(f"当前版本:{AppState.get_live_version()}")
        self.check_update = QPushButton("检查更新")
        
        live_pts_layout = QHBoxLayout()
        live_pts_layout.addWidget(QLabel("直播平台:"))
        live_pts_layout.addWidget(self.chk_ks)
        live_pts_layout.addWidget(self.chk_dy)
        live_pts_layout.addStretch()
        live_pts_layout.addWidget(self.version_label)
        live_pts_layout.addWidget(self.check_update)
        
        # 布局
        btn_layout = QHBoxLayout()
        btn_layout.addWidget(self.start_btn)
        btn_layout.addWidget(self.stop_btn)
        btn_layout.setSpacing(30)
        
        
        main_layout = QVBoxLayout()
        main_layout.setContentsMargins(25, 25, 25, 25)
        main_layout.setSpacing(20)
        
        
        main_layout.addLayout(serial_layout)
        main_layout.addLayout(addr_layout)
        # main_layout.addWidget(self.address_label)
        main_layout.addLayout(btn_layout)
        # main_layout.addWidget(self.msg_label)
        main_layout.addLayout(filter_msg_gif)
        main_layout.addWidget(self.msg_display)
        main_layout.addLayout(live_pts_layout)
        # main_layout.addLayout(filter_layout)
        self.setLayout(main_layout)
        # 绑定按钮事件
        self.activate_btn.toggled.connect(self.on_activate_clicked)
        self.refresh_serial_btn.clicked.connect(self.refresh_serial_ports)
        self.connect_serial_btn.clicked.connect(self.connect_serial_port)
        self.btn_lock_cfg.clicked.connect(self.open_lock_cfg)
        self.start_btn.clicked.connect(self.on_start)
        self.stop_btn.clicked.connect(self.on_stop)
        
        self.show_gift.toggled.connect(self.on_checkbox_changed)
        self.show_msg.toggled.connect(self.on_checkbox_changed)
        self.show_gift_voice.toggled.connect(self.on_checkbox_changed)
        self.chk_ks.toggled.connect(self.on_checkbox_changed)
        self.chk_dy.toggled.connect(self.on_checkbox_changed)
        self.check_update.clicked.connect(self.check_update_version)
   
        # 初始化时刷新串口列表
        self.refresh_serial_ports()
        
        # 延迟执行检测逻辑
        QTimer.singleShot(3*1000, self.check_device_status)
        
    # 通用勾选回调
    def on_checkbox_changed(self, checked):
        sender = self.sender()  # 获取触发的 QCheckBox
        if sender == self.show_gift:
            AppState.set_show_gift(checked)
            print(f"礼物勾选状态: {checked}")
        elif sender == self.show_msg:
            AppState.set_show_msg(checked)
            print(f"聊天勾选状态: {checked}")   
        elif sender == self.show_gift_voice:
            AppState.set_show_gift_voice(checked)
            print(f"礼物语音勾选状态: {checked}")
        elif sender == self.show_msg_voice:
            AppState.set_show_msg_voice(checked)
            print(f"聊天语音勾选状态: {checked}")  
        elif sender == self.chk_ks:
            AppState.set_chk_ks(checked)
            print(f"快手勾选状态: {checked}")
        elif sender == self.chk_dy:
            AppState.set_chk_dy(checked)
            print(f"抖音勾选状态: {checked}")
    def open_lock_cfg(self):
        if self.connect_serial_btn.text() == "断开":
            self.lock_cfg_win = LockSettingsWidget(self)
            self.lock_cfg_win.settings_saved.connect(self.apply_lock_cfg)  # 可选
            self.lock_cfg_win.show()   
        else:
            self.append_message("请先连接串口")
            QMessageBox.critical(self, "提示", "请先连接串口")   
    def apply_lock_cfg(self, cfg):
        # 例如把配置同步给 SerialManager 或 WebSocketManager
        print("新的锁控配置:", cfg)
    def check_device_status(self):
        """检查设备状态,包括是否过期和加载本地存储的设备数据"""
        ok, saved_code, saved_expire = load_activate_flag()
        if ok:
            logger.info(f"liveMain 输出存储的激活码:{saved_code}")
            self.action_code.setText(saved_code)
            self.code = saved_code
            self.activate_device()
        else:
            self.add_device()
    def refresh_serial_ports(self):
        """刷新串口列表"""
        self.serial_combo.clear()
        ports = get_available_ports()
        if ports:
            self.serial_combo.addItems(ports)
            self.append_message(f"发现 {len(ports)} 个可用串口,请手动连接正确COM口")
        else:
            self.append_message("未发现可用串口")
    def connect_serial_port(self):
        """连接选中的串口"""
        selected = self.serial_combo.currentText()
        if not selected:
            QMessageBox.warning(self, "提示", "请选择串口")
            return
            
        # 提取端口号(如从"COM6 - USB Serial Port"中提取"COM6")
        port = selected.split(" - ")[0]
        
        # from SerialManager import serial_manager
        if serial_manager.connect(port):
            self.append_message(f"串口 {port} 连接成功")
            self.connect_serial_btn.setText("断开")
            self.connect_serial_btn.clicked.disconnect()
            self.connect_serial_btn.clicked.connect(self.disconnect_serial_port)
        else:
            self.append_message(f"串口 {port} 连接失败")
    def disconnect_serial_port(self):
        """断开当前串口连接"""
        # from SerialManager import serial_manager
        serial_manager.close()
        self.append_message("串口已断开")
        self.connect_serial_btn.setText("连接")
        self.connect_serial_btn.clicked.disconnect()
        self.connect_serial_btn.clicked.connect(self.connect_serial_port)
   
    def setActBtStatus(self,clickStatus):
        if clickStatus==0:   
            self.activate_btn.setText("激活")
            self.activate_btn.setEnabled(True)
        elif clickStatus==1:
            self.activate_btn.setText("激活中...")
            self.activate_btn.setEnabled(False)
        elif clickStatus==2:
            self.activate_btn.setText("已激活")
            self.activate_btn.setEnabled(False)
   
    @pyqtSlot(str)
    def on_message_received(self, message):
        """接收WebSocketManager的消息并转发,确保在主线程中处理"""
        # 检查当前线程是否为主线程
        if QThread.currentThread() != self.thread():
            # 如果不是主线程,通过信号转发到主线程
            self.forward_message.emit(message)
        else:
            # 如果已经是主线程,直接显示
            self.append_message(message)
   
    # 收到礼物列表后展示
    @pyqtSlot(dict)
    def on_gift_list_received(self, data: dict):
        if not data:
            self.msg_display.append("拉取失败或为空")
            return
        # 这里只简单打印
        # logging.info(f"收到礼物列表: {data}")
        # 你可以按 data["data"] 解析成表格、下拉框等
        
               
    @pyqtSlot()
    def on_filter_changed(self):
        self.ws_manager.filter_flags["礼物"]  = self.chk_gift.isChecked()
        self.ws_manager.filter_flags["点赞"]  = self.chk_like.isChecked()
        self.ws_manager.filter_flags["点亮"] = self.chk_light.isChecked()
        self.ws_manager.filter_flags["弹幕"] = self.chk_danmu.isChecked()
    @pyqtSlot()
    def on_live_pts_changed(self):
        self.ws_manager.live_pts["抖音"]  = self.chk_dy.isChecked()
        self.ws_manager.live_pts["快手"]  = self.chk_ks.isChecked()
    @pyqtSlot()
    def on_start(self):
        # 1. 先确保旧连接断开(防止重复)
        try:
            self.http_server.data_received.disconnect(self.on_data_received)
        except TypeError:
            pass  # 没连过时抛异常,忽略
        # 2. 重新连接并启动
        self.http_server.data_received.connect(self.on_data_received)
        self.http_server.start()
        self.start_btn.setEnabled(False)
        self.stop_btn.setEnabled(True)
        logger.info("liveMain 网络服务已启动")
    @pyqtSlot()
    def on_stop(self):
        # 1. 停止服务
        self.http_server.stop()
        # 2. 断开信号(可选,也可不断)
        try:
            self.http_server.data_received.disconnect(self.on_data_received)
        except TypeError:
            pass
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)
        logger.info("liveMain 网络服务已停止")
    # HTTP 服务连接
    def on_data_received(self, data):
        self.append_message(str(data))
    @pyqtSlot()
    def on_thread_ready(self):
        self.append_message("线程已就绪,启动WebSocket服务...")
        self.async_thread.add_task(self.ws_manager.start())
   
    @pyqtSlot()
    def on_websocket_started(self):
        self.ws_running = True
        self.start_btn.setEnabled(False)
        self.stop_btn.setEnabled(True)
        self.append_message("WebSocket服务已成功启动")
        logger.info("liveMain 服务已启动")
    @pyqtSlot()
    def on_websocket_stopped(self):
        self.ws_running = False
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)
        if self.async_thread and self.async_thread.isRunning():
            self.async_thread.stop()
        self.append_message("WebSocket服务已成功停止")
        logger.info("liveMain 服务已停止")
    @pyqtSlot(str)
    def on_error(self, error_msg):
        QMessageBox.critical(self, "错误", error_msg)
        logger.error(error_msg)
        self.append_message(f"【错误】{error_msg}")
        self.ws_running = False
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)
        if self.async_thread and self.async_thread.isRunning():
            self.async_thread.stop()
    def getGiftList(self,gifList):
        headers = {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "Accept-Encoding": "gzip, deflate, br",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
            "Cache-Control": "max-age=0",
            "Connection": "keep-alive",
            "Cookie": "clientid=3; did=web_6173d0f93508d03325bae8db40b91428",
            "Host": "live.kuaishou.com",
            "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Microsoft Edge";v="120"',
            "Sec-Ch-Ua-Mobile": "?0",
            "Sec-Ch-Ua-Platform": '"Windows"',
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
            "Upgrade-Insecure-Requests": "1",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0"
        }
        response = requests.get(gifList,headers=headers, timeout=8)
        # return response.json()
        # 1. 接口整体是否返回有效数据
        response = response.json()
        self.gift_list_received.emit(response)   #   
LockSettingsWidget.py
[Python] 纯文本查看 复制代码# LockSettingsWidget.py
import json, os, sys
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtWidgets import (QWidget,QDialog,QVBoxLayout, QHBoxLayout, QLabel,
                             QLineEdit, QPushButton, QSpinBox, QMessageBox,
                             QScrollArea, QGroupBox, QFrame)
from SerialManager import serial_manager   # 复用你已有的串口封装
import logging
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
# 获取当前程序目录
if getattr(sys, 'frozen', False):
    # 打包后的可执行文件运行
    base_dir = os.path.dirname(sys.executable)
else:
    # 开发环境中运行
    base_dir = os.path.dirname(__file__)
   
class LockRow(QWidget):
    """单个锁口配置行"""
    def __init__(self, idx, parent=None):
        super().__init__(parent)
        self.idx = idx
        h = QHBoxLayout(self)
        h.setContentsMargins(0, 0, 0, 0)
        h.addWidget(QLabel(f"线路{idx + 1}"))
        self.addr = QLineEdit("1")
        self.addr.setFixedWidth(50)
        h.addWidget(QLabel("地址:"))
        h.addWidget(self.addr)
        self.lock_no = QLineEdit(f"{idx + 1}")
        self.lock_no.setFixedWidth(50)
        h.addWidget(QLabel("锁号:"))
        h.addWidget(self.lock_no)
        self.gift_name = QLineEdit("")
        self.gift_name.setFixedWidth(120)
        h.addWidget(QLabel("礼物名:"))
        h.addWidget(self.gift_name)
        self.btn_test = QPushButton("测试")
        self.btn_test.clicked.connect(self.test_open)
        h.addWidget(self.btn_test)
    # ---------------- 功能 ----------------
    def test_open(self):
        addr = self.addr.text().strip()
        lock = self.lock_no.text().strip()
        gift_name = self.gift_name.text().strip()
        if not addr.isdigit() or not lock.isdigit():
            QMessageBox.warning(self, "提示", "地址和锁号必须是数字!")
            return
        if not serial_manager.is_open():
            QMessageBox.warning(self, "提示", "串口未连接!")
            return
        if addr and lock and gift_name:
            cmd = f"55{int(addr):02X}A1{int(lock):02X}00"
            serial_manager.send_command(cmd)
            QMessageBox.information(self, "测试", f"已发送:{cmd.strip()}")
        else:
            QMessageBox.warning(self, "提示", "地址、锁号礼物不能为空!")
    def get_cfg(self):
        return {
            "addr": self.addr.text().strip(),
            "lock": self.lock_no.text().strip(),
            "giftName": self.gift_name.text().strip()
        }
    def set_cfg(self, cfg):
        self.addr.setText(cfg.get("addr", "1"))
        self.lock_no.setText(cfg.get("lock", "1"))
        self.gift_name.setText(cfg.get("giftName", ""))
def cfg_path():
    """返回 lock_cfg.json 的绝对路径(兼容开发/打包)"""
    lock_cfg_path = os.path.join(base_dir, "data")
    os.makedirs(lock_cfg_path, exist_ok=True)  # 如果不存在就创建
    lock_cfg_file = os.path.join(lock_cfg_path, "lock_cfg.json")
    return lock_cfg_file
class LockSettingsWidget(QDialog):
    settings_saved = pyqtSignal(dict)   # 通知主窗口
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("锁控板配置")
        self.resize(600, 400)
        self.setWindowModality(Qt.ApplicationModal)
        self.cfg_file = cfg_path()
        self.rows = []          # 存放 LockRow 实例
        self.init_ui()
        self._first_load = True
        self.load_cfg()
        
    # ---------- UI ----------
    def init_ui(self):
        top = QVBoxLayout(self)
        # 1. 数量控制
        h = QHBoxLayout()
        h.addWidget(QLabel("锁口数量:"))
        self.sp_count = QSpinBox()
        self.sp_count.setRange(1, 32)
        self.sp_count.setValue(8)
        self.sp_count.valueChanged.connect(self.change_count)
        h.addWidget(self.sp_count)
        h.addStretch()
        top.addLayout(h)
        # 2. 可滚动区域
        scroll = QScrollArea()
        scroll.setWidgetResizable(True)
        self.scroll_widget = QWidget()
        self.scroll_layout = QVBoxLayout(self.scroll_widget)
        self.scroll_layout.setAlignment(Qt.AlignTop)
        scroll.setWidget(self.scroll_widget)
        top.addWidget(scroll)
        # 3. 保存按钮
        self.btn_save = QPushButton("保存配置")
        self.btn_save.clicked.connect(self.save_cfg)
        top.addWidget(self.btn_save)
    # ---------- 锁口数量变更 ----------
    def change_count(self, n):
        """锁口数量变化:重建行并回填现有数据"""
        # 记录当前所有数据
        old_cfg = [row.get_cfg() for row in self.rows]
        # 清空界面
        for row in self.rows:
            row.setParent(None)
        self.rows.clear()
        # 重建 n 行
        for i in range(n):
            row = LockRow(i)
            self.scroll_layout.addWidget(row)
            self.rows.append(row)
        # 回填旧数据(超出的部分留空)
        for idx, row in enumerate(self.rows):
            if idx
SerialManager.py
[Python] 纯文本查看 复制代码import serial
import threading
import queue
import time
import logging
logging.basicConfig(
    level=logging.INFO,  # 显示 INFO 及以上级别日志
    format='%(asctime)s [%(levelname)s] %(message)s'
)
class SerialManager:
    def __init__(self, port='COM6', baudrate=19200, timeout=1, interval=0.2):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.ser = None
        self.interval = interval
        self._running = True
        self._cmd_queue = queue.Queue()
        self._thread = threading.Thread(target=self._worker, daemon=True)
        self._thread.start()
    # ---------- 连接 / 关闭 ----------
    def connect(self, port=None, baudrate=None):
        if port:
            self.port = port
        if baudrate:
            self.baudrate = baudrate
        if self.ser and self.ser.is_open:
            self.ser.close()
        try:
            self.ser = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=self.timeout
            )
            logging.info(f"串口 {self.port} 已打开")
            return True
        except Exception as e:
            logging.error(f"串口连接失败: {e}")
            self.ser = None
            return False
    def close(self):
        self._running = False
        self._thread.join(timeout=1)
        if self.ser and self.ser.is_open:
            self.ser.close()
        logging.info("串口已关闭")
    # ---------- 工具:异或校验 ----------
    @staticmethod
    def _with_xor(hex_str: str) -> bytes:
        """
        输入: 16 进制字符串,如 'A101'
        输出: bytes,末尾带 1 字节异或校验
        """
        data = bytes.fromhex(hex_str.replace(" ", ""))
        checksum = 0
        for b in data:
            checksum ^= b
        return data + bytes([checksum])
    # ---------- 对外发送 ----------
    def send_command(self, hex_cmd: str):
        """
        带校验后入队,由线程安全发送
        """
        frame = self._with_xor(hex_cmd)
        self._cmd_queue.put(frame.hex().upper())   # 队列里仍用 hex 字符串,方便日志
    # ---------- 后台线程 ----------
    def _worker(self):
        while self._running:
            try:
                frame_hex = self._cmd_queue.get(timeout=0.1)
                if self.ser and self.ser.is_open:
                    frame_bytes = bytes.fromhex(frame_hex)
                    self.ser.write(frame_bytes)
                    logging.info(f"TX: {frame_bytes.hex().upper()}")
                    time.sleep(self.interval)
            except queue.Empty:
                pass
            # 读回显
            if self.ser and self.ser.is_open and self.ser.in_waiting:
                rx = self.ser.read(self.ser.in_waiting)
                logging.info(f"RX: {rx.hex().upper()}")
    # ---------- 状态 ----------
    def is_open(self):
        return self.ser and self.ser.is_open
# 全局单例
serial_manager = SerialManager(interval=0.5)
tts_manager.py
[Python] 纯文本查看 复制代码import asyncio
import edge_tts
import os
import sys
import time
from threading import Thread, Lock
from queue import Queue
import pygame
class TTSManager:
    """
    TTSManager 单例类
    - 使用 edge-tts 将文字转语音
    - 将生成的音频文件保存到程序目录下的 mp3 文件夹
    - 使用单线程队列顺序播放语音,并确保临时文件被删除
    """
    _instance = None
    _lock = Lock()  # 保证单例线程安全
    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(TTSManager, cls).__new__(cls)
                # 初始化队列和工作线程
                cls._instance._queue = Queue()
                cls._instance._thread = Thread(target=cls._instance._worker, daemon=True)
                cls._instance._thread.start()
                # 初始化pygame音频系统
                pygame.mixer.init()
                # 创建一个待删除文件的列表
                cls._instance._files_to_delete = []
            return cls._instance
    async def _speak_async(self, text):
        """
        异步方法生成 TTS 并播放
        """
        # 获取程序根目录(区分开发/打包环境)
        if getattr(sys, 'frozen', False):
            # 打包后的可执行文件运行
            base_dir = os.path.dirname(sys.executable)
        else:
            # 开发环境运行
            base_dir = os.path.dirname(__file__)
        # mp3 文件存放路径
        mp3_dir = os.path.join(base_dir, "mp3")
        if not os.path.exists(mp3_dir):
            os.makedirs(mp3_dir)
        # 临时文件名,放在 mp3 目录下
        # tmp_filename = os.path.join(mp3_dir, f"{int(time.time() * 1000)}.mp3")
        tmp_filename = os.path.join(mp3_dir, "tmp.mp3")
        try:
            # 先尝试删除之前未能删除的文件
            self._cleanup_files()
            
            # 创建 edge-tts 对象
            tts = edge_tts.Communicate(text, voice="zh-CN-XiaoyiNeural") # 语音模型
            # 保存音频到文件
            await tts.save(tmp_filename)
            
            # 使用pygame播放音频
            pygame.mixer.music.load(tmp_filename)
            pygame.mixer.music.play()
            
            # 等待播放完成
            while pygame.mixer.music.get_busy():
                pygame.time.Clock().tick(10)
               
            # 明确停止播放并卸载音频,释放资源
            pygame.mixer.music.stop()
            pygame.mixer.music.unload()
            
            # 短暂延迟,确保资源释放
            time.sleep(0.1)
            
            # 尝试删除文件
            self._delete_file(tmp_filename)
            
        except Exception as e:
            print(f"TTS 处理出错: {e}")
            # 如果出错也将文件加入待删除列表
            if os.path.exists(tmp_filename):
                self._files_to_delete.append(tmp_filename)
    def _delete_file(self, filename):
        """尝试删除文件,失败则加入待删除列表"""
        try:
            os.remove(filename)
        except Exception as e:
            print(f"立即删除文件失败,将稍后重试: {e}")
            self._files_to_delete.append(filename)
   
    def _cleanup_files(self):
        """清理之前未能删除的文件"""
        if not self._files_to_delete:
            return
            
        files_remaining = []
        for filename in self._files_to_delete:
            try:
                # 先尝试直接删除
                os.remove(filename)
                print(f"成功删除之前的临时文件: {filename}")
            except:
                try:
                    # 如果直接删除失败,尝试移动到回收站
                    if os.name == 'nt':  # Windows系统
                        # import winshell
                        # winshell.recycle_bin().move(filename)
                        print(f"文件已移至回收站: {filename}")
                    else:
                        # 非Windows系统,保留待下次尝试
                        files_remaining.append(filename)
                except Exception as e:
                    print(f"清理文件 {filename} 失败: {e}")
                    files_remaining.append(filename)
        
        self._files_to_delete = files_remaining
    def _worker(self):
        """
        队列工作线程,不断获取待播放文本并顺序播放
        """
        while True:
            text = self._queue.get()
            try:
                asyncio.run(self._speak_async(text))
            except Exception as e:
                print(f"TTS 播放出错: {e}")
            self._queue.task_done()
    @classmethod
    def speak(cls, text):
        """
        对外方法,将文本加入队列,顺序播放
        """
        instance = cls()
        instance._queue.put(text)
# -----------------------------
# 测试方法
# -----------------------------
def test_tts_sequence():
    """
    测试 TTS 顺序播放
    """
    print("测试开始:依次播放三条语音")
    TTSManager.speak("第一条消息,测试顺序播放")
    TTSManager.speak("第二条消息,应该在第一条播完后播放")
    TTSManager.speak("第三条消息,最后播放")
    # 保证主线程不退出,给 TTS 播放留时间
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        # 程序退出前最后尝试清理文件
        tts = TTSManager()
        tts._cleanup_files()
        print("测试结束")
if __name__ == "__main__":
    test_tts_sequence()
   
HttpServer.py
[Python] 纯文本查看 复制代码# HttpServer.py
import json,sys,os
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from PyQt5.QtCore import QObject, pyqtSignal
from SerialManager import serial_manager  
import logging
from app_state import AppState
from tts_manager import TTSManager
logging.basicConfig(
    level=logging.INFO,  # 显示 INFO 及以上级别日志
    format='%(asctime)s [%(levelname)s] %(message)s'
)
# 获取当前程序目录
if getattr(sys, 'frozen', False):
    # 打包后的可执行文件运行
    base_dir = os.path.dirname(sys.executable)
else:
    # 开发环境中运行
    base_dir = os.path.dirname(__file__)
   
def cfg_path():
    """返回 lock_cfg.json 的绝对路径(兼容开发/打包)"""
    # if getattr(sys, 'frozen', False):
    #     # 打包后运行
    #     return os.path.join(os.path.dirname(sys.executable), "lock_cfg.json")
    # else:
    #     # 开发环境
    #     return os.path.join(os.path.dirname(__file__), "lock_cfg.json")
    lock_cfg_path = os.path.join(base_dir, "data")
    os.makedirs(lock_cfg_path, exist_ok=True)  # 如果不存在就创建
    lock_cfg_file = os.path.join(lock_cfg_path, "lock_cfg.json")
    return lock_cfg_file
def load_cfg():
        """第一次打开时读取 json"""
        try:
            with open(cfg_path(), "r", encoding="utf-8") as f:
                return json.load(f)
        except FileNotFoundError:
            return {"count": 0, "list": []}
      
class HttpHandler(BaseHTTPRequestHandler):
    """HTTP 请求处理类"""
    server_instance = None  # 用于传递 PyQt 信号
   
               
    def _set_headers(self, code=200):
        self.send_response(code)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
   
    def checkGift(self,gifName:str):
        lockDatas = load_cfg()  
        if isinstance(lockDatas, dict) and 'list' in lockDatas:
            for lockItem in lockDatas['list']:
                if isinstance(lockItem, dict) and (lockItem.get('giftName') in gifName or lockItem.get('giftName') == gifName) :
                    return lockItem
        return None
    def getSetGifts(self):
        lockDatas = load_cfg()  
        return lockDatas
    # 执行硬件处理
    def sendCmd(self,cmd:str):
        if serial_manager.is_open():
            serial_manager.send_command(cmd)
        else:
            logging.warning(f"串口未连接,请检查!")
    def emitSend(self,data:str):
        if HttpHandler.server_instance:
                HttpHandler.server_instance.data_received.emit(f"{data}")
    # 赠送礼物真是名称
    def getRealGifName(self,data:dict):
        if data.get("giftName"):
            giftName = data['giftName'].replace('送', '').replace('出', '').replace('了', '')
            return giftName
        else:
            return None
    def voice(self,data:dict):
        if AppState.get_show_gift_voice:
            try:
                if data['nickName']:
                    # TTSManager.speak(f"感谢{data['nickName']}送的{data['giftName'].replace('送', '').replace('出', '')}")
                    TTSManager.speak(f"感谢{data['nickName']}送的{self.getRealGifName(data)}")
                else:
                    # TTSManager.speak(f"感谢 这个名字咋读 送的{data['giftName'].replace('送', '').replace('出', '')}")
                    TTSManager.speak(f"感谢 这个名字咋读 送的{self.getRealGifName(data)}")
            except Exception as e:
                    logging.error(f"礼物 播报语音失败{e}")
        if AppState.get_show_msg_voice:
            if data.get("msg"):
                if AppState.get_show_msg_voice:
                    if data['nickName']:
                        TTSManager.speak(f"恭喜{data['nickName']}触发关键词,赠送一个笑脸")
                    else:
                        TTSManager.speak(f"感谢 这个名字咋读 送的{data['giftName']}触发关键词,赠送一个笑脸")
               
                    
    def do_POST(self):
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length)
        try:
            data = json.loads(post_data)
            logging.info(f"Received data: {data}")
            # self.emitSend(f"{data}")
            allSetGifts = self.getSetGifts()
            if  allSetGifts["count"] == 0 or not allSetGifts["list"]:
                self.emitSend(f"请配置控制器礼物数据")
                return
            if AppState.get_chk_dy():
                if '某某' in data['from']:
                    logging.info(f"'某某' in data['from']:= {'某某' in data['from']}")
                    if '礼物' == data.get('source'):
                        logging.info(f"data['source'] == '礼物':= {data['source'] == '礼物'}")
                        gifName = data['giftName'].replace("送出", "")
                        logging.info(f"某某礼物name:= {gifName}")
                        self.voice(data)
                        lockItem = self.checkGift(gifName)
                        if lockItem:
                            logging.info(f"某某礼物指令:= {lockItem['cmd']}")
                            logging.info(f"某某礼物发送勾选状态{AppState.get_show_gift()}")
                            # 发送信号到 GUI
                            if AppState.get_show_gift():
                                self.emitSend(f"礼物---{data['from']}{data['nickName']}赠送:{self.getRealGifName(data)}---预备:砰……")   
                                #serial_manager.send_command(lockItem['cmd'])
                                self.sendCmd(lockItem['cmd'])
                        else:
                            logging.info(f"某某礼物发送勾选状态{AppState.get_show_gift()}")
                            if AppState.get_show_gift():
                                logging.warning(f"某某礼物配置:{gifName}不存在,不执行")
                                self.emitSend(f"礼物---{data['from']}{data['nickName']}赠送:{self.getRealGifName(data)}---未设置礼物不执行砰……")   
                           
                    if "聊天" == data.get('source'):
                        if AppState.get_show_msg():
                            self.emitSend(f"聊天---{data['from']}:{data.get('nickName')}:{data.get('msg')}")
                        self.voice(data)
                        if '给我玩一次,看我6不6' in data.get("msg"):
                            if AppState.get_show_msg():
                                # 发送信号到 GUI
                                self.emitSend(f"聊天---{data['from']}-观众-{data['msg']}:---触发预设关键字指令 预备:砰……")   
                                #serial_manager.send_command(lockItem['cmd'])
                                self.sendCmd(lockItem['cmd'])
                                   
            if AppState.get_chk_ks():            
                if '某某' in data['from']:
                    logging.info(f"'某某' in data['from']:= {'某某' in data['from']}")
                    if '礼物' == data.get('source'):
                        logging.info(f"data['source'] == '礼物':= {data['source'] == '礼物'}")
                        gifName = data['giftName'].replace("送出", "")
                        logging.info(f"某某礼物name:= {gifName}")
                        self.voice(data)
                        lockItem = self.checkGift(gifName)
                        if lockItem:
                            logging.info(f"某某礼物指令:= {lockItem['cmd']}")
                            logging.info(f"某某礼物发送勾选状态{AppState.get_show_gift()}")
                            # 发送信号到 GUI
                            if AppState.get_show_gift():
                                self.emitSend(f"礼物---{data['from']}{data['nickName']}赠送:{self.getRealGifName(data)}---预备:砰……")   
                                #serial_manager.send_command(lockItem['cmd'])
                                self.sendCmd(lockItem['cmd'])
                        else:
                            logging.info(f"某某礼物发送勾选状态{AppState.get_show_gift()}")
                            logging.warning(f"某某礼物配置:{self.getRealGifName(data)}不存在,不执行")
                            # 发送信号到 GUI
                            if AppState.get_show_gift():
                                self.emitSend(f"礼物---{data['from']}{data['nickName']}赠送:{self.getRealGifName(data)}---未设置礼物不执行砰……")   
                                #serial_manager.send_command(lockItem['cmd'])
                    if "聊天" == data.get('source'):
                        self.voice(data)
                        if AppState.get_show_msg():
                            if data.get("msg"):
                                self.emitSend(f"聊天---{data['from']}:{data.get('nickName')}:{data.get('msg')}")
                        if data.get("msg"):
                            if '给我玩一次,看我6不6' in data.get("msg"):
                                if AppState.get_show_msg():
                                    # 发送信号到 GUI
                                    self.emitSend(f"聊天---{data['from']}-观众-{data['msg']}:---预备:砰……")   
                                    #serial_manager.send_command(lockItem['cmd'])
                                    self.sendCmd(lockItem['cmd'])
      
        except json.JSONDecodeError:
            data = {"error": "Invalid JSON"}
        
        # # 发送信号到 GUI
        # if HttpHandler.server_instance:
        #     HttpHandler.server_instance.data_received.emit(data)
        
        self._set_headers()
        self.wfile.write(json.dumps({"status": "ok"}).encode('utf-8'))
class HttpServer(QObject):
    """封装 HTTPServer 的 PyQt 对象"""
    data_received = pyqtSignal(str)
    def __init__(self, host="0.0.0.0", port=8766):
        super().__init__()
        self.host = host
        self.port = port
        self.httpd = None
        self._thread = None
        
   
    def start(self):
        if self._thread and self._thread.is_alive():
            logging.info("HTTP Server already running")
            return
        # 重新创建 HTTPServer 和 Thread
        self.httpd = HTTPServer((self.host, self.port), HttpHandler)
        HttpHandler.server_instance = self
        self._thread = threading.Thread(target=self._run_server, daemon=True)
        self._thread.start()
        logging.info(f"HTTP Server started at http://{self.host}:{self.port}")
        self.data_received.emit(f"HTTP Server started at http://{self.host}:{self.port}")
    def _run_server(self):
        try:
            self.httpd.serve_forever()
        except Exception as e:
            # 修复日志格式
            logging.info("HTTP Server stopped: %s", e)
    def stop(self):
        if self.httpd:
            self.httpd.shutdown()
            self.httpd.server_close()
            self.httpd = None
        logging.info("HTTP Server stopped")
        self.data_received.emit("HTTP Server stopped")
除了升级 功能 其他代码都在这儿了。
升级那个 不会玩 单独打个了 .exe 文件 然后 主程序调用了一下 ,感觉很怪异。
python Gui 程序 升级 都是咋搞的????

礼物, 串口

zhengduimen
OP
  


lsb2pojie 发表于 2025-9-2 08:36
长见识了,这种自动化还挺有意思,推倒后能自动摆放就更复杂了

自动摆放 说实话没必要 这类型的  直播 抖音一般推流 按我了解到的 基本流量周期 在个把月 时间到了 就得换个别的方案 ,例如 第一个月扎气球 慢慢就不给流量了,再换推砖块  然后 换 扎沙袋 等等 隔一段时间就得想个新招。
再续前缘   

搞不好后面人工的
michener   


再续前缘 发表于 2025-9-1 18:38
搞不好后面人工的

真不是人推 是实时的  淘宝有卖这种礼物智能硬件的
博士   

以前写过 但是网上软件才88一年果断放弃了
anwen   


再续前缘 发表于 2025-9-1 18:38
搞不好后面人工的

那不至于..抓取礼物和硬件联动也不难.
singer1   

666,长见识了
52kail   

学习到了
lsb2pojie   

长见识了,这种自动化还挺有意思,推倒后能自动摆放就更复杂了
zhengduimen
OP
  


再续前缘 发表于 2025-9-1 18:38
搞不好后面人工的

  这种前期也确实很多是人工推的 ,但那都是几年前的事儿了。现在基本都是自动。
您需要登录后才可以回帖 登录 | 立即注册

返回顶部