使用ai生成的蓝牙串口调试程序

查看 46|回复 4
作者:lx136648   
目前仅支持ble,本来是可以支持其他蓝牙协议的,可是不知道怎么回事,pybluez  这个包怎么都导不进去,没办法只能用ble了,希望有大佬改进一下[Python] 纯文本查看 复制代码import sys
import asyncio
import threading
import binascii
import os
import uuid # For generating unique IDs for buttons
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
    QTextEdit, QPushButton, QListWidget, QComboBox, QCheckBox,
    QLabel, QLineEdit, QSplitter, QMessageBox, QStatusBar,
    QSizePolicy, QSpinBox, QAction, QMenu, QWidgetAction,
    QDialog, QFormLayout, QDialogButtonBox, QRadioButton, QGridLayout,
    QTabWidget,
)
from PyQt5.QtCore import Qt, pyqtSignal, QObject, QTimer, QSettings, QCoreApplication
from PyQt5.QtGui import QTextCursor, QFont
# --- Constants ---
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
# --- Button Configuration Dialog ---
class ButtonConfigDialog(QDialog): # This class remains unchanged from your last correct version
    def __init__(self, button_config=None, parent=None, is_new_button=True):
        super().__init__(parent)
        self.is_new_button = is_new_button
        self.setWindowTitle("设置按钮" if not is_new_button else "新建按钮")
        self.setMinimumWidth(400)
        self.config = button_config.copy() if button_config else {}
        layout = QFormLayout(self)
        self.name_edit = QLineEdit(self.config.get("name", ""))
        layout.addRow("按钮名称:", self.name_edit)
        self.press_data_label = QLabel("按下时发送的数据:")
        self.press_data_edit = QTextEdit()
        self.press_data_edit.setPlainText(self.config.get("press_data", ""))
        self.press_data_edit.setFixedHeight(80); self.press_data_edit.setAcceptRichText(False)
        layout.addRow(self.press_data_label, self.press_data_edit)
        self.press_data_hex_checkbox = QCheckBox("HEX")
        self.press_data_hex_checkbox.setChecked(self.config.get("press_hex", False))
        hex_press_layout = QHBoxLayout(); hex_press_layout.addWidget(self.press_data_hex_checkbox); hex_press_layout.addStretch()
        layout.addRow("", hex_press_layout)
        self.release_data_label = QLabel("松开时发送的数据 (可选):")
        self.release_data_edit = QTextEdit()
        self.release_data_edit.setPlainText(self.config.get("release_data", ""))
        self.release_data_edit.setFixedHeight(60); self.release_data_edit.setAcceptRichText(False)
        layout.addRow(self.release_data_label, self.release_data_edit)
        self.release_data_hex_checkbox = QCheckBox("HEX")
        self.release_data_hex_checkbox.setChecked(self.config.get("release_hex", False))
        hex_release_layout = QHBoxLayout(); hex_release_layout.addWidget(self.release_data_hex_checkbox); hex_release_layout.addStretch()
        layout.addRow("", hex_release_layout)
        mode_group_box = QWidget(); mode_layout = QHBoxLayout(mode_group_box)
        self.click_mode_radio = QRadioButton("点击模式"); self.long_press_mode_radio = QRadioButton("长按模式")
        mode_layout.addWidget(self.click_mode_radio); mode_layout.addWidget(self.long_press_mode_radio); mode_layout.addStretch()
        current_mode = self.config.get("mode", "click")
        if current_mode == "long_press": self.long_press_mode_radio.setChecked(True)
        else: self.click_mode_radio.setChecked(True)
        layout.addRow("发送模式:", mode_group_box)
        self.long_press_interval_spinbox = QSpinBox()
        self.long_press_interval_spinbox.setRange(50, 5000); self.long_press_interval_spinbox.setSingleStep(50)
        self.long_press_interval_spinbox.setSuffix(" ms"); self.long_press_interval_spinbox.setValue(self.config.get("long_press_interval", 500))
        layout.addRow("长按发送间隔:", self.long_press_interval_spinbox)
        self.long_press_interval_spinbox.setEnabled(self.long_press_mode_radio.isChecked())
        self.long_press_mode_radio.toggled.connect(self.long_press_interval_spinbox.setEnabled)
        self.click_mode_radio.toggled.connect(lambda checked: self.long_press_interval_spinbox.setDisabled(checked))
        self.button_box = QDialogButtonBox()
        self.button_box.addButton(QDialogButtonBox.Ok); self.button_box.addButton(QDialogButtonBox.Cancel)
        if not self.is_new_button:
            delete_button = self.button_box.addButton("删除此按钮", QDialogButtonBox.DestructiveRole)
            delete_button.clicked.connect(self.handle_delete_this_button_signal)
        self.button_box.accepted.connect(self.accept); self.button_box.rejected.connect(self.reject)
        layout.addRow(self.button_box)
    def handle_delete_this_button_signal(self):
        reply = QMessageBox.question(self, "确认删除", f"确定要永久删除按钮 '{self.config.get('name', '此按钮')}' 吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
        if reply == QMessageBox.Yes: self.config["_to_be_deleted_"] = True; self.accept()
    def update_internal_config(self):
        self.config["name"] = self.name_edit.text().strip()
        self.config["press_data"] = self.press_data_edit.toPlainText()
        self.config["press_hex"] = self.press_data_hex_checkbox.isChecked()
        self.config["release_data"] = self.release_data_edit.toPlainText()
        self.config["release_hex"] = self.release_data_hex_checkbox.isChecked()
        self.config["mode"] = "long_press" if self.long_press_mode_radio.isChecked() else "click"
        self.config["long_press_interval"] = self.long_press_interval_spinbox.value()
    def accept(self):
        if not self.config.get("_to_be_deleted_"): self.update_internal_config()
        super().accept()
    def get_config(self): return self.config
# --- AsyncWorker for BLE Operations (Formatted Correctly) ---
class AsyncWorker(QObject):
    finished = pyqtSignal()
    error = pyqtSignal(str)
    ble_device_found_signal = pyqtSignal(str, str)
    ble_connection_success_signal = pyqtSignal(str)
    ble_connection_failed_signal = pyqtSignal(str)
    ble_disconnected_signal = pyqtSignal()
    ble_data_received_signal = pyqtSignal(bytes)
    ble_unexpected_disconnected_signal = pyqtSignal(str)
    def __init__(self):
        super().__init__()
        self.loop = None
        self.thread = None
        self.current_task = None
    def _run_async_loop(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        try:
            self.loop.run_forever()
        finally:
            self.loop.close()
    def start_event_loop_thread(self):
        if self.thread is None or not self.thread.is_alive():
            self.thread = threading.Thread(target=self._run_async_loop, daemon=True)
            self.thread.start()
    def stop_event_loop_thread(self):
        if self.loop and self.loop.is_running():
            self.loop.call_soon_threadsafe(self.loop.stop)
        if self.thread and self.thread.is_alive():
            self.thread.join(timeout=1)
            if self.thread.is_alive():
                print("Warning: Async BLE thread did not terminate cleanly.")
        self.thread = None
        self.loop = None
    async def _scan_ble_devices(self):
        try:
            devices = await BleakScanner.discover(timeout=5.0)
            for device in devices:
                name = device.name if device.name else "Unknown Device"
                self.ble_device_found_signal.emit(name, device.address)
        except Exception as e:
            self.error.emit(f"BLE Scan error: {str(e)}")
        finally:
            self.finished.emit()
    def scan_ble_devices(self):
        if self.loop and self.loop.is_running():
            asyncio.run_coroutine_threadsafe(self._scan_ble_devices(), self.loop)
        else:
            self.error.emit("BLE async loop not running.")
    async def _connect_ble_device(self, address, client_ref):
        def handle_unexpected_disconnect(client_instance_backend):
            device_address_str = "未知设备"
            try:
                if hasattr(client_instance_backend, 'address'):
                    device_address_str = client_instance_backend.address
                print(f"BLEAK_CALLBACK: Device {device_address_str} disconnected unexpectedly.")
            except Exception as e_cb:
                print(f"BLEAK_CALLBACK: Error in disconnect handler: {e_cb}")
            
            if self.loop and self.loop.is_running():
                 self.loop.call_soon_threadsafe(self.ble_unexpected_disconnected_signal.emit, device_address_str)
            else:
                 self.ble_unexpected_disconnected_signal.emit(device_address_str)
        print(f"Async BLE: Connecting to {address}...")
        client_ref.clear()
        client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
        try:
            async with BleakClient(address, timeout=15.0) as client:
                client_ref['client'] = client
                device_name_for_signal = client.name if hasattr(client, 'name') and client.name else client.address
                print(f"Async BLE: Context for {client.address} ({device_name_for_signal})")
                client.set_disconnected_callback(handle_unexpected_disconnect)
                print(f"Async BLE: Disconnected cb set for {client.address}")
                await asyncio.sleep(0.5)
                print(f"Async BLE: Discovering services for {client.address}...")
                try:
                    await client.get_services()
                except Exception as e_gs:
                    print(f"Async BLE: Warn - Service discovery error: {e_gs}")
               
                print(f"Async BLE: Finding characteristics for {client.address}")
                write_char_obj, notify_char_obj = None, None
                try:
                    nus_service = client.services.get_service(UART_SERVICE_UUID)
                    if nus_service:
                        write_char_obj = nus_service.get_characteristic(UART_RX_CHAR_UUID)
                        notify_char_obj = nus_service.get_characteristic(UART_TX_CHAR_UUID)
                        if write_char_obj and notify_char_obj:
                            print("Async BLE: Found NUS chars.")
                except Exception as e_n:
                    print(f"Async BLE: NUS lookup: {e_n}")
               
                if not (write_char_obj and notify_char_obj):
                    print("Async BLE: NUS not full. Searching generic...")
                    for service in client.services:
                        for char_in_service in service.characteristics:
                            if "write" in char_in_service.properties and not write_char_obj:
                                write_char_obj = char_in_service
                            if "notify" in char_in_service.properties and not notify_char_obj:
                                notify_char_obj = char_in_service
                        if write_char_obj and notify_char_obj:
                            nus_ok = False
                            if client.services.get_service(UART_SERVICE_UUID):
                                nus_s_ = client.services.get_service(UART_SERVICE_UUID)
                                if nus_s_.get_characteristic(UART_RX_CHAR_UUID) and \
                                   nus_s_.get_characteristic(UART_TX_CHAR_UUID):
                                    nus_ok = True
                            if not nus_ok:
                                break
               
                if not write_char_obj:
                    self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - No writable char")
                    return
                if not notify_char_obj:
                    self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - No notifiable char")
                    return
                client_ref['write_char_uuid'] = write_char_obj.uuid
                client_ref['notify_char_uuid'] = notify_char_obj.uuid
                print(f"Async BLE: Using W:{write_char_obj.uuid}, N:{notify_char_obj.uuid}")
               
                try:
                    def notification_handler_wrapper(sender, data_):
                        self.ble_data_received_signal.emit(bytes(data_))
                    await client.start_notify(notify_char_obj.uuid, notification_handler_wrapper)
                    print(f"Async BLE: Notifications started on {notify_char_obj.uuid}.")
                except Exception as e_not:
                    self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - Start notify failed: {str(e_not)[:50]}")
                    return
               
                self.ble_connection_success_signal.emit(device_name_for_signal)
                print(f"Async BLE: Connection OK for {client.address}. Keep-alive...")
                while client.is_connected:
                    await asyncio.sleep(0.2)
                print(f"Async BLE: Keep-alive exited for {client.address}. Connected: {client.is_connected if client else 'N/A'}.")
        except BleakError as e_be:
            self.ble_connection_failed_signal.emit(f"BLE Connection Error: {str(e_be)}")
        except asyncio.TimeoutError:
            self.ble_connection_failed_signal.emit(f"BLE Connection Timeout: {address}")
        except Exception as e_gen:
            import traceback
            traceback.print_exc()
            self.ble_connection_failed_signal.emit(f"BLE Unknown Connection Error: {str(e_gen)}")
        finally:
            print(f"Async BLE: _connect_ble_device finally for {address}.")
            client_ref.clear()
            client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
            if self.loop and self.loop.is_running():
                self.finished.emit()
            print(f"Async BLE: _connect_ble_device for {address} fully finished (finally).")
    def connect_ble_device(self, address, client_ref):
        if self.loop and self.loop.is_running():
            self.current_task = asyncio.run_coroutine_threadsafe(self._connect_ble_device(address, client_ref), self.loop)
    async def _disconnect_ble_device(self, client_ref):
        client = client_ref.get('client')
        if client and client.is_connected:
            try:
                if client_ref.get('notify_char_uuid'):
                    await client.stop_notify(client_ref['notify_char_uuid'])
                await client.disconnect()
            except Exception as e_d:
                self.error.emit(f"BLE Disconnect error: {str(e_d)}")
            finally:
                client_ref.clear()
                client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
                self.ble_disconnected_signal.emit()
        else:
            self.ble_disconnected_signal.emit()
    def disconnect_ble_device(self, client_ref):
        if self.loop and self.loop.is_running():
            if self.current_task and not self.current_task.done():
                self.current_task.cancel()
            asyncio.run_coroutine_threadsafe(self._disconnect_ble_device(client_ref), self.loop)
        else:
            client_ref.clear()
            client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
            self.ble_disconnected_signal.emit()
    async def _send_ble_data(self, client_ref, data):
        client = client_ref.get('client')
        write_uuid = client_ref.get('write_char_uuid')
        if client and client.is_connected and write_uuid:
            try:
                await client.write_gatt_char(write_uuid, data)
            except Exception as e_s:
                self.error.emit(f"BLE Send error: {str(e_s)}")
        elif not client or not client.is_connected:
            self.error.emit("BLE: Not connected.")
        elif not write_uuid:
            self.error.emit("BLE: No write characteristic.")
    def send_ble_data(self, client_ref, data):
        if self.loop and self.loop.is_running():
            asyncio.run_coroutine_threadsafe(self._send_ble_data(client_ref, data), self.loop)
# --- BLESerialAssistant Class (Main Application Window) ---
# The BLESerialAssistant class definition (from __init__ to the end)
# should be taken from the previous "生成修改后的完整代码" response where its methods
# were already correctly formatted into multiple lines.
# I will re-paste it here for completeness, ensuring all its methods are also multi-line.
class BLESerialAssistant(QMainWindow):
    DEFAULT_FONT_SIZE = 9
    ORGANIZATION_NAME = "MyPortableApps"
    APPLICATION_NAME = "BLESerialAssistant"
    SETTINGS_FILENAME = "ble_config.ini"
    MAX_CUSTOM_BUTTONS = 12
    def __init__(self):
        super().__init__()
        self.setWindowTitle("PC BLE Serial Assistant")
        self.ble_client_ref = {'client': None, 'write_char_uuid': None, 'notify_char_uuid': None}
        self.devices_discovered = {}
        self.receive_count = 0; self.send_count = 0
        self.custom_buttons_config = []; self.custom_button_widgets = []
        self.edit_mode_enabled = False; self.long_press_timers = {}
        self.selected_button_for_move_source_id = None
        self.async_worker = AsyncWorker()
        QCoreApplication.setOrganizationName(self.ORGANIZATION_NAME)
        QCoreApplication.setApplicationName(self.APPLICATION_NAME)
        if getattr(sys, 'frozen', False): application_path = os.path.dirname(sys.executable)
        elif __file__: application_path = os.path.dirname(os.path.abspath(__file__))
        else: application_path = os.getcwd()
        self.settings_file_path = os.path.join(application_path, self.SETTINGS_FILENAME)
        self.settings = QSettings(self.settings_file_path, QSettings.IniFormat)
        self.init_ui_with_tabs()
        self.async_worker.start_event_loop_thread()
        self.load_settings()
        self.async_worker.ble_device_found_signal.connect(self.add_ble_device_to_list)
        self.async_worker.ble_connection_success_signal.connect(self.on_ble_connection_success)
        self.async_worker.ble_connection_failed_signal.connect(self.on_ble_connection_failed)
        self.async_worker.ble_disconnected_signal.connect(self.on_ble_disconnected)
        self.async_worker.ble_unexpected_disconnected_signal.connect(self.on_ble_unexpected_disconnected)
        self.async_worker.ble_data_received_signal.connect(self.on_data_received)
        self.async_worker.error.connect(self.show_error_message)
        self.async_worker.finished.connect(self.on_async_task_finished)
        self.update_status("未连接 (Idle)", r=0, s=0)
        self.send_timer = QTimer(self)
        self.send_timer.timeout.connect(self.send_data_from_ui)
    def init_ui_with_tabs(self):
        menubar = self.menuBar()
        view_menu = menubar.addMenu('视图(&V)')
        font_size_menu = QMenu('字体大小', self)
        self.font_size_spinbox = QSpinBox()
        self.font_size_spinbox.setRange(7, 24)
        self.font_size_spinbox.setSuffix(" pt")
        self.font_size_spinbox.valueChanged.connect(self.change_font_size_from_spinbox)
        font_widget_action = QWidgetAction(self)
        font_widget_action.setDefaultWidget(self.font_size_spinbox)
        font_size_menu.addAction(font_widget_action)
        view_menu.addMenu(font_size_menu)
        
        self.tab_widget = QTabWidget()
        self.setCentralWidget(self.tab_widget)
        
        self.ble_operations_tab = QWidget()
        self.tab_widget.addTab(self.ble_operations_tab, "蓝牙串口")
        ble_ops_main_layout = QHBoxLayout(self.ble_operations_tab)
        
        self.left_splitter = QSplitter(Qt.Vertical)
        self.device_list_widget = QListWidget()
        self.device_list_widget.itemDoubleClicked.connect(self.connect_selected_device)
        self.receive_text_edit = QTextEdit()
        self.receive_text_edit.setReadOnly(True)
        self.left_v_layout1_label = QLabel("蓝牙列表 (BLE)")
        left_v_layout1_content = QWidget()
        left_v_layout1 = QVBoxLayout(left_v_layout1_content)
        left_v_layout1.addWidget(self.left_v_layout1_label)
        left_v_layout1.addWidget(self.device_list_widget)
        self.left_v_layout2_label = QLabel("数据接收")
        left_v_layout2_content = QWidget()
        left_v_layout2 = QVBoxLayout(left_v_layout2_content)
        left_v_layout2.addWidget(self.left_v_layout2_label)
        left_v_layout2.addWidget(self.receive_text_edit)
        self.left_splitter.addWidget(left_v_layout1_content)
        self.left_splitter.addWidget(left_v_layout2_content)
        
        self.right_pane_widget_tab1 = QWidget()
        right_v_layout_tab1 = QVBoxLayout(self.right_pane_widget_tab1)
        self.ble_op_label = QLabel("蓝牙操作 (BLE)")
        ble_op_group_widget = QWidget()
        ble_op_layout = QVBoxLayout(ble_op_group_widget)
        ble_op_layout.addWidget(self.ble_op_label)
        self.scan_type_fixed_label = QLabel("扫描蓝牙类型: BLE蓝牙")
        scan_type_layout = QHBoxLayout()
        scan_type_layout.addWidget(self.scan_type_fixed_label)
        ble_op_layout.addLayout(scan_type_layout)
        ble_buttons_layout = QHBoxLayout()
        self.disconnect_btn = QPushButton("断开蓝牙")
        self.disconnect_btn.clicked.connect(self.disconnect_current_device)
        self.disconnect_btn.setEnabled(False)
        self.scan_btn = QPushButton("扫描蓝牙")
        self.scan_btn.clicked.connect(self.start_scan)
        ble_buttons_layout.addWidget(self.disconnect_btn)
        ble_buttons_layout.addWidget(self.scan_btn)
        ble_op_layout.addLayout(ble_buttons_layout)
        right_v_layout_tab1.addWidget(ble_op_group_widget)
        
        self.data_proc_label = QLabel("数据处理")
        data_proc_group_widget = QWidget()
        data_proc_layout = QVBoxLayout(data_proc_group_widget)
        data_proc_layout.addWidget(self.data_proc_label)
        self.send_newline_checkbox = QCheckBox("发送新行 (CRLF)")
        self.recv_clear_on_limit_checkbox = QCheckBox("接收超200KB清屏")
        self.hex_display_checkbox = QCheckBox("16进制显示")
        self.hex_send_checkbox = QCheckBox("16进制发送")
        data_proc_layout.addWidget(self.send_newline_checkbox)
        data_proc_layout.addWidget(self.recv_clear_on_limit_checkbox)
        h_layout_hex = QHBoxLayout()
        h_layout_hex.addWidget(self.hex_display_checkbox)
        h_layout_hex.addWidget(self.hex_send_checkbox)
        data_proc_layout.addLayout(h_layout_hex)
        right_v_layout_tab1.addWidget(data_proc_group_widget)
        
        self.send_op_v_layout_label = QLabel("数据发送")
        send_op_group_widget = QWidget()
        send_op_v_layout = QVBoxLayout(send_op_group_widget)
        send_op_v_layout.addWidget(self.send_op_v_layout_label)
        self.send_op_period_label = QLabel("周期:")
        send_op_controls_layout = QHBoxLayout()
        send_op_controls_layout.addWidget(self.send_op_period_label)
        self.period_edit = QLineEdit("500")
        self.period_edit.setFixedWidth(60)
        self.loop_send_checkbox = QCheckBox("循环发送")
        self.loop_send_checkbox.stateChanged.connect(self.toggle_periodic_send)
        send_op_controls_layout.addWidget(self.period_edit)
        send_op_controls_layout.addWidget(self.loop_send_checkbox)
        send_op_controls_layout.addStretch()
        send_op_v_layout.addLayout(send_op_controls_layout)
        self.send_text_edit = QTextEdit()
        self.send_text_edit.setFixedHeight(100)
        send_op_v_layout.addWidget(self.send_text_edit)
        self.clear_recv_btn = QPushButton("清除接收")
        self.clear_recv_btn.clicked.connect(lambda: (self.receive_text_edit.clear(), setattr(self, 'receive_count', 0), self.update_status(self.status_label.text().replace("状态: ",""), r=0)))
        self.clear_send_btn = QPushButton("清空发送")
        self.clear_send_btn.clicked.connect(self.send_text_edit.clear)
        self.send_data_btn = QPushButton("发送数据")
        self.send_data_btn.clicked.connect(self.send_data_from_ui)
        self.send_data_btn.setEnabled(False)
        button_grid_layout = QHBoxLayout()
        button_grid_layout.addWidget(self.clear_recv_btn)
        button_grid_layout.addWidget(self.clear_send_btn)
        send_op_v_layout.addLayout(button_grid_layout)
        send_op_v_layout.addWidget(self.send_data_btn)
        right_v_layout_tab1.addWidget(send_op_group_widget)
        right_v_layout_tab1.addStretch()
        
        self.main_splitter_tab1 = QSplitter(Qt.Horizontal)
        self.main_splitter_tab1.addWidget(self.left_splitter)
        self.main_splitter_tab1.addWidget(self.right_pane_widget_tab1)
        ble_ops_main_layout.addWidget(self.main_splitter_tab1)
        
        self.custom_buttons_tab = QWidget()
        self.tab_widget.addTab(self.custom_buttons_tab, "按键控制")
        custom_buttons_main_layout = QVBoxLayout(self.custom_buttons_tab)
        edit_controls_widget = QWidget()
        edit_mode_controls_layout = QHBoxLayout(edit_controls_widget)
        self.edit_mode_checkbox = QCheckBox("启用按键编辑模式 (点击按钮选择/移动目标,再次点击已选按钮进行编辑/删除)")
        self.edit_mode_checkbox.stateChanged.connect(self.toggle_edit_mode)
        edit_mode_controls_layout.addWidget(self.edit_mode_checkbox)
        edit_mode_controls_layout.addStretch()
        custom_buttons_main_layout.addWidget(edit_controls_widget)
        self.custom_buttons_container = QWidget()
        self.custom_buttons_layout = QGridLayout(self.custom_buttons_container)
        self.custom_buttons_layout.setSpacing(10)
        custom_buttons_main_layout.addWidget(self.custom_buttons_container)
        custom_buttons_main_layout.addStretch()
        
        self.status_bar = QStatusBar()
        self.setStatusBar(self.status_bar)
        self.status_label = QLabel("状态: 未初始化")
        self.rs_label = QLabel("R:0 S:0")
        self.version_label = QLabel("V:2.0.2") # Final formatting version
        self.status_bar.addWidget(self.status_label, 1)
        self.status_bar.addPermanentWidget(self.rs_label)
        self.status_bar.addPermanentWidget(self.version_label)
    # --- All other methods of BLESerialAssistant (load_settings, save_settings, toggle_edit_mode,
    #      recreate_custom_buttons_ui, handle_grid_cell_click_edit_mode, add_new_custom_button,
    #      edit_custom_button, handle_custom_button_pressed/released/timeout, send_custom_data,
    #      change_font_size, update_status, show_error_message, on_async_task_finished, start_scan,
    #      add_ble_device_to_list, connect_selected_device, on_ble_connection_success,
    #      on_ble_connection_failed, on_ble_disconnected, on_ble_unexpected_disconnected,
    #      _reset_connection_state, disconnect_current_device, send_data_from_ui,
    #      on_data_received, toggle_periodic_send, closeEvent)
    #      should be taken from the previous "生成修改后的完整代码" (the one that fixed AttributeError for disconnect_current_device
    #      AND had all the custom button logic and already corrected standard methods).
    #      The formatting of these methods was already correct there.
    #      I will re-paste them here with correct formatting.
   
    def load_settings(self):
        print(f"Loading settings from: {self.settings.fileName()}")
        geometry = self.settings.value("window/geometry")
        if geometry: self.restoreGeometry(geometry)
        else: self.setGeometry(100, 100, 980, 780)
        main_splitter_state_tab1 = self.settings.value("splitters/mainStateTab1")
        if main_splitter_state_tab1: self.main_splitter_tab1.restoreState(main_splitter_state_tab1)
        else: self.main_splitter_tab1.setSizes([int(self.width() * 0.55), int(self.width() * 0.45)])
        left_splitter_state_tab1 = self.settings.value("splitters/leftStateTab1")
        if left_splitter_state_tab1: self.left_splitter.restoreState(left_splitter_state_tab1)
        else: self.left_splitter.setSizes([int(self.left_splitter.height() * 0.4), int(self.left_splitter.height() * 0.6)])
        font_size = self.settings.value("appearance/fontSize", self.DEFAULT_FONT_SIZE, type=int)
        self.font_size_spinbox.setValue(font_size)
        self.send_newline_checkbox.setChecked(self.settings.value("options/sendNewline", False, type=bool))
        self.recv_clear_on_limit_checkbox.setChecked(self.settings.value("options/recvClearOnLimit", False, type=bool))
        self.hex_display_checkbox.setChecked(self.settings.value("options/hexDisplay", False, type=bool))
        self.hex_send_checkbox.setChecked(self.settings.value("options/hexSend", False, type=bool))
        self.period_edit.setText(self.settings.value("options/periodValue", "500", type=str))
        edit_mode_was_enabled = self.settings.value("customButtons/editMode", False, type=bool)
        self.edit_mode_checkbox.setChecked(edit_mode_was_enabled)
        self.toggle_edit_mode(self.edit_mode_checkbox.isChecked())
        self.custom_buttons_config = []
        size = self.settings.beginReadArray("customButtonsData")
        for i in range(size):
            self.settings.setArrayIndex(i)
            config = {"id": self.settings.value("id", f"btn_auto_{uuid.uuid4().hex[:8]}"),
                      "name": self.settings.value("name", f"Button {i+1}"),
                      "press_data": self.settings.value("press_data", ""),
                      "press_hex": self.settings.value("press_hex", False, type=bool),
                      "release_data": self.settings.value("release_data", ""),
                      "release_hex": self.settings.value("release_hex", False, type=bool),
                      "mode": self.settings.value("mode", "click"),
                      "long_press_interval": self.settings.value("long_press_interval", 500, type=int)}
            self.custom_buttons_config.append(config)
        self.settings.endArray()
        self.recreate_custom_buttons_ui()
        active_tab_index = self.settings.value("window/activeTab", 0, type=int)
        if 0 = len(self.custom_buttons_config): self.custom_buttons_config.append(config_to_move)
                else: self.custom_buttons_config.insert(target_flat_index, config_to_move)
                self.custom_buttons_config = self.custom_buttons_config[:self.MAX_CUSTOM_BUTTONS]
                self.selected_button_for_move_source_id = None
            self.recreate_custom_buttons_ui()
        elif button_id_at_cell and not is_placeholder:
            self.selected_button_for_move_source_id = button_id_at_cell
            self.recreate_custom_buttons_ui()
    def add_new_custom_button(self):
        if not self.edit_mode_enabled: return
        if len(self.custom_buttons_config) >= self.MAX_CUSTOM_BUTTONS:
            QMessageBox.information(self, "数量限制", f"最多只能添加 {self.MAX_CUSTOM_BUTTONS} 个自定义按钮。"); return
        dialog = ButtonConfigDialog(parent=self, is_new_button=True)
        if dialog.exec_() == QDialog.Accepted:
            new_config = dialog.get_config()
            if not new_config.get("name", "").strip(): QMessageBox.warning(self, "输入错误", "按钮名称不能为空!"); return
            new_config["id"] = f"custom_{uuid.uuid4().hex[:12]}"; self.custom_buttons_config.append(new_config)
            self.selected_button_for_move_source_id = new_config["id"]; self.recreate_custom_buttons_ui()
    def edit_custom_button(self, button_id_to_edit):
        if not self.edit_mode_enabled: return
        config_to_edit, index_to_edit = None, -1
        for i, cfg in enumerate(self.custom_buttons_config):
            if cfg.get("id") == button_id_to_edit: config_to_edit, index_to_edit = cfg, i; break
        if not config_to_edit: QMessageBox.warning(self, "错误", f"内部错误: 未找到按钮 {button_id_to_edit}"); return
        dialog = ButtonConfigDialog(button_config=config_to_edit, parent=self, is_new_button=False)
        if dialog.exec_() == QDialog.Accepted:
            updated_config = dialog.get_config()
            if updated_config.get("_to_be_deleted_"):
                self.custom_buttons_config.pop(index_to_edit)
                if button_id_to_edit in self.long_press_timers:
                    timer = self.long_press_timers.pop(button_id_to_edit); timer.stop(); timer.deleteLater()
                self.selected_button_for_move_source_id = None
            elif not updated_config.get("name", "").strip():
                QMessageBox.warning(self, "输入错误", "按钮名称不能为空!"); return
            else:
                updated_config["id"] = button_id_to_edit; self.custom_buttons_config[index_to_edit] = updated_config
            self.recreate_custom_buttons_ui()
    def handle_custom_button_pressed(self):
        if self.edit_mode_enabled: return
        sender_button = self.sender();
        if not isinstance(sender_button, QPushButton): return
        button_id = sender_button.property("button_id");
        if not button_id: return
        config = next((cfg for cfg in self.custom_buttons_config if cfg.get("id") == button_id), None)
        if not config: print(f"Error: Config not found for button ID {button_id}"); return
        if config.get("press_data", "").strip(): self.send_custom_data(config.get("press_data"), config.get("press_hex", False))
        if config.get("mode") == "long_press":
            if button_id in self.long_press_timers and self.long_press_timers[button_id].isActive(): self.long_press_timers[button_id].stop()
            timer = QTimer(self); timer.setObjectName(f"timer_{button_id}")
            timer.setInterval(config.get("long_press_interval", 500))
            timer.timeout.connect(lambda b_id=button_id, p_data=config.get("press_data"), p_hex=config.get("press_hex", False): self.handle_long_press_timeout(b_id, p_data, p_hex))
            timer.start(); self.long_press_timers[button_id] = timer
    def handle_custom_button_released(self):
        if self.edit_mode_enabled: return
        sender_button = self.sender()
        if not isinstance(sender_button, QPushButton): return
        button_id = sender_button.property("button_id")
        if not button_id: return
        config = next((cfg for cfg in self.custom_buttons_config if cfg.get("id") == button_id), None)
        if not config: return
        if config.get("mode") == "long_press":
            if button_id in self.long_press_timers:
                timer = self.long_press_timers[button_id]
                if timer.isActive(): timer.stop()
        if config.get("release_data", "").strip(): self.send_custom_data(config.get("release_data"), config.get("release_hex", False))
    def handle_long_press_timeout(self, button_id, press_data, press_hex):
        button_widget = self.findChild(QPushButton, button_id)
        if button_widget and not button_widget.isDown():
            if button_id in self.long_press_timers: self.long_press_timers[button_id].stop()
            return
        if press_data.strip(): self.send_custom_data(press_data, press_hex)
    def send_custom_data(self, data_str, is_hex):
        if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected): print("Custom Send: Device not connected."); return
        if not data_str.strip(): return
        try:
            if is_hex:
                clean_hex = "".join(filter(lambda x: x in "0123456789abcdefABCDEF", data_str))
                if len(clean_hex) % 2 != 0: self.show_error_message(f"自定义发送错误: 无效的HEX长度 '{data_str[:20]}...'"); return
                if not clean_hex and data_str.strip(): self.show_error_message(f"自定义发送错误: 无效的HEX字符 '{data_str[:20]}...'"); return
                data_bytes = binascii.unhexlify(clean_hex) if clean_hex else b""
            else: data_bytes = data_str.encode('utf-8', errors='replace')
            if data_bytes:
                self.async_worker.send_ble_data(self.ble_client_ref, data_bytes)
                self.send_count += len(data_bytes); self.update_status(self.status_label.text().replace("状态: ",""), s=self.send_count)
        except binascii.Error: self.show_error_message(f"自定义发送错误: 无效的HEX数据 '{data_str[:20]}...'")
        except Exception as e: self.show_error_message(f"自定义发送错误: {str(e)}")
    def change_font_size_from_spinbox(self, size):
        self.change_font_size(size)
    def change_font_size(self, size):
        font = QFont(); font.setPointSize(size); QApplication.setFont(font)
        self.receive_text_edit.setFont(font); self.send_text_edit.setFont(font); self.device_list_widget.setFont(font)
        self.update()
    def update_status(self, message, r=None, s=None):
        self.status_label.setText(f"状态: {message}")
        if r is not None: self.receive_count = r
        if s is not None: self.send_count = s
        self.rs_label.setText(f"R:{self.receive_count} S:{self.send_count}")
    def show_error_message(self, message):
        QMessageBox.critical(self, "错误", message)
        current_status_base = self.status_label.text().replace("状态: ", "")
        if not ("已连接" in current_status_base or "未连接" in current_status_base or "扫描中" in current_status_base):
            self.update_status(f"错误: {message.split(':')[0] if ':' in message else message}")
        if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected):
            self.scan_btn.setEnabled(True)
    def on_async_task_finished(self):
        if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected):
            self.scan_btn.setEnabled(True)
    def start_scan(self):
        if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected:
            self.show_error_message("请先断开当前连接的设备。"); return
        self.device_list_widget.clear(); self.devices_discovered.clear()
        self.update_status("扫描中 (BLE)..."); self.scan_btn.setEnabled(False)
        self.async_worker.scan_ble_devices()
    def add_ble_device_to_list(self, name, address):
        display_name = f"{name} ({address})"
        self.devices_discovered[display_name] = {"type": "BLE", "address": address, "name": name}
        self.device_list_widget.addItem(display_name)
    def connect_selected_device(self):
        selected_item_widget = self.device_list_widget.currentItem()
        if not selected_item_widget: self.show_error_message("请先选择一个设备。"); return
        selected_key = selected_item_widget.text()
        device_info = self.devices_discovered.get(selected_key)
        if not device_info: self.show_error_message("设备信息未找到。"); return
        if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected: self.show_error_message("请先断开当前连接。"); return
        address = device_info["address"]; self.update_status(f"连接中至 {device_info['name']}...")
        self.scan_btn.setEnabled(False); self.disconnect_btn.setEnabled(True)
        self.async_worker.connect_ble_device(address, self.ble_client_ref)
    def on_ble_connection_success(self, device_name):
        self.update_status(f"BLE已连接: {device_name}")
        self.send_data_btn.setEnabled(True); self.disconnect_btn.setEnabled(True); self.scan_btn.setEnabled(False)
        if "错误" in self.status_label.text().replace("状态: ", ""):
            self.update_status(f"BLE已连接: {device_name}", r=self.receive_count, s=self.send_count)
    def on_ble_connection_failed(self, error_message):
        self.show_error_message(f"BLE连接失败: {error_message}")
        self.update_status(f"BLE连接失败"); self.ble_client_ref['client'] = None
        self._reset_connection_state()
    def on_ble_disconnected(self):
        self.update_status("BLE已断开连接"); self.ble_client_ref['client'] = None
        self._reset_connection_state()
    def on_ble_unexpected_disconnected(self, device_address_str):
        print(f"GUI: Unexpected disconnect for {device_address_str}")
        if self.ble_client_ref.get('client') is not None or "已连接" in self.status_label.text():
            QMessageBox.warning(self, "连接中断", f"与设备 {device_address_str} 的连接意外断开。")
            self.update_status(f"连接意外断开: {device_address_str}")
            self.ble_client_ref.clear(); self.ble_client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
        self._reset_connection_state()
    def _reset_connection_state(self):
        print("GUI: Resetting connection state.")
        if self.send_timer.isActive(): self.loop_send_checkbox.setChecked(False)
        for tid in list(self.long_press_timers.keys()):
            timer = self.long_press_timers.pop(tid)
            if timer.isActive(): timer.stop()
            timer.deleteLater()
        self.send_data_btn.setEnabled(False); self.disconnect_btn.setEnabled(False)
        self.scan_btn.setEnabled(True)
    def disconnect_current_device(self):
        self.update_status("断开连接中...")
        if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected:
            self.async_worker.disconnect_ble_device(self.ble_client_ref)
        else: self._reset_connection_state(); self.update_status("无活动连接或已断开")
    def send_data_from_ui(self):
        is_connected = self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected
        if not is_connected:
            if self.send_timer.isActive(): self.loop_send_checkbox.setChecked(False)
            self.show_error_message("设备未连接。"); return
        data_str = self.send_text_edit.toPlainText()
        if not data_str.strip() and not self.hex_send_checkbox.isChecked(): return
        try:
            if self.hex_send_checkbox.isChecked():
                clean_hex = "".join(filter(lambda x: x in "0123456789abcdefABCDEF", data_str))
                if len(clean_hex) % 2 != 0: self.show_error_message("16进制数据长度必须为偶数。"); return
                if not clean_hex and data_str.strip(): self.show_error_message("无效的16进制字符。"); return
                data_bytes = binascii.unhexlify(clean_hex) if clean_hex else b""
            else:
                processed_data_str = data_str
                if self.send_newline_checkbox.isChecked(): processed_data_str += "\r\n"
                data_bytes = processed_data_str.encode('utf-8', errors='replace')
            if data_bytes or (self.hex_send_checkbox.isChecked() and not data_str.strip()):
                self.async_worker.send_ble_data(self.ble_client_ref, data_bytes)
                self.send_count += len(data_bytes)
                self.update_status(self.status_label.text().replace("状态: ",""), s=self.send_count)
        except binascii.Error: self.show_error_message("无效的16进制字符。")
        except Exception as e: self.show_error_message(f"发送准备错误: {str(e)}")
    def on_data_received(self, data_bytes):
        if self.recv_clear_on_limit_checkbox.isChecked():
            if (len(self.receive_text_edit.toPlainText().encode('utf-8')) + len(data_bytes)) > 200 * 1024:
                self.receive_text_edit.clear(); self.receive_count = 0
                self.update_status(self.status_label.text().replace("状态: ",""), r=self.receive_count)
        current_display_text = ""
        if self.hex_display_checkbox.isChecked():
            hex_text = binascii.hexlify(data_bytes).decode('ascii').upper()
            current_display_text = ' '.join(hex_text[i:i+2] for i in range(0, len(hex_text), 2))
            if current_display_text: current_display_text += " "
        else:
            try: current_display_text = data_bytes.decode('utf-8', errors='replace')
            except UnicodeDecodeError: current_display_text = data_bytes.decode('latin-1', errors='replace')
        if current_display_text or data_bytes:
            self.receive_text_edit.moveCursor(QTextCursor.End)
            self.receive_text_edit.insertPlainText(current_display_text + "\n")
            self.receive_text_edit.ensureCursorVisible()
        self.receive_count += len(data_bytes)
        self.update_status(self.status_label.text().replace("状态: ",""), r=self.receive_count)
    def toggle_periodic_send(self, state_int_or_bool):
        state = bool(state_int_or_bool)
        is_connected = self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected
        if state:
            if not is_connected:
                self.show_error_message("请先连接设备再启动循环发送。"); self.loop_send_checkbox.setChecked(False); return
            try:
                period_ms = int(self.period_edit.text())
                if period_ms

蓝牙, 按钮

daoye9988   

请在帖子中插入部分关键代码
本版块仅限分享编程技术和源码相关内容,发布帖子必须带上关键代码和具体功能介绍
abaooo   

AI功能越来越强大了
zhiqingchun   

要安装一下 break模块
zhiqingchun   

可以的,赞
您需要登录后才可以回帖 登录 | 立即注册

返回顶部