import asyncio
import threading
import binascii
import os
import uuid # For generating unique IDs for buttons
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QTextEdit, QPushButton, QListWidget, QComboBox, QCheckBox,
QLabel, QLineEdit, QSplitter, QMessageBox, QStatusBar,
QSizePolicy, QSpinBox, QAction, QMenu, QWidgetAction,
QDialog, QFormLayout, QDialogButtonBox, QRadioButton, QGridLayout,
QTabWidget,
)
from PyQt5.QtCore import Qt, pyqtSignal, QObject, QTimer, QSettings, QCoreApplication
from PyQt5.QtGui import QTextCursor, QFont
# --- Constants ---
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
# --- Button Configuration Dialog ---
class ButtonConfigDialog(QDialog): # This class remains unchanged from your last correct version
def __init__(self, button_config=None, parent=None, is_new_button=True):
super().__init__(parent)
self.is_new_button = is_new_button
self.setWindowTitle("设置按钮" if not is_new_button else "新建按钮")
self.setMinimumWidth(400)
self.config = button_config.copy() if button_config else {}
layout = QFormLayout(self)
self.name_edit = QLineEdit(self.config.get("name", ""))
layout.addRow("按钮名称:", self.name_edit)
self.press_data_label = QLabel("按下时发送的数据:")
self.press_data_edit = QTextEdit()
self.press_data_edit.setPlainText(self.config.get("press_data", ""))
self.press_data_edit.setFixedHeight(80); self.press_data_edit.setAcceptRichText(False)
layout.addRow(self.press_data_label, self.press_data_edit)
self.press_data_hex_checkbox = QCheckBox("HEX")
self.press_data_hex_checkbox.setChecked(self.config.get("press_hex", False))
hex_press_layout = QHBoxLayout(); hex_press_layout.addWidget(self.press_data_hex_checkbox); hex_press_layout.addStretch()
layout.addRow("", hex_press_layout)
self.release_data_label = QLabel("松开时发送的数据 (可选):")
self.release_data_edit = QTextEdit()
self.release_data_edit.setPlainText(self.config.get("release_data", ""))
self.release_data_edit.setFixedHeight(60); self.release_data_edit.setAcceptRichText(False)
layout.addRow(self.release_data_label, self.release_data_edit)
self.release_data_hex_checkbox = QCheckBox("HEX")
self.release_data_hex_checkbox.setChecked(self.config.get("release_hex", False))
hex_release_layout = QHBoxLayout(); hex_release_layout.addWidget(self.release_data_hex_checkbox); hex_release_layout.addStretch()
layout.addRow("", hex_release_layout)
mode_group_box = QWidget(); mode_layout = QHBoxLayout(mode_group_box)
self.click_mode_radio = QRadioButton("点击模式"); self.long_press_mode_radio = QRadioButton("长按模式")
mode_layout.addWidget(self.click_mode_radio); mode_layout.addWidget(self.long_press_mode_radio); mode_layout.addStretch()
current_mode = self.config.get("mode", "click")
if current_mode == "long_press": self.long_press_mode_radio.setChecked(True)
else: self.click_mode_radio.setChecked(True)
layout.addRow("发送模式:", mode_group_box)
self.long_press_interval_spinbox = QSpinBox()
self.long_press_interval_spinbox.setRange(50, 5000); self.long_press_interval_spinbox.setSingleStep(50)
self.long_press_interval_spinbox.setSuffix(" ms"); self.long_press_interval_spinbox.setValue(self.config.get("long_press_interval", 500))
layout.addRow("长按发送间隔:", self.long_press_interval_spinbox)
self.long_press_interval_spinbox.setEnabled(self.long_press_mode_radio.isChecked())
self.long_press_mode_radio.toggled.connect(self.long_press_interval_spinbox.setEnabled)
self.click_mode_radio.toggled.connect(lambda checked: self.long_press_interval_spinbox.setDisabled(checked))
self.button_box = QDialogButtonBox()
self.button_box.addButton(QDialogButtonBox.Ok); self.button_box.addButton(QDialogButtonBox.Cancel)
if not self.is_new_button:
delete_button = self.button_box.addButton("删除此按钮", QDialogButtonBox.DestructiveRole)
delete_button.clicked.connect(self.handle_delete_this_button_signal)
self.button_box.accepted.connect(self.accept); self.button_box.rejected.connect(self.reject)
layout.addRow(self.button_box)
def handle_delete_this_button_signal(self):
reply = QMessageBox.question(self, "确认删除", f"确定要永久删除按钮 '{self.config.get('name', '此按钮')}' 吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes: self.config["_to_be_deleted_"] = True; self.accept()
def update_internal_config(self):
self.config["name"] = self.name_edit.text().strip()
self.config["press_data"] = self.press_data_edit.toPlainText()
self.config["press_hex"] = self.press_data_hex_checkbox.isChecked()
self.config["release_data"] = self.release_data_edit.toPlainText()
self.config["release_hex"] = self.release_data_hex_checkbox.isChecked()
self.config["mode"] = "long_press" if self.long_press_mode_radio.isChecked() else "click"
self.config["long_press_interval"] = self.long_press_interval_spinbox.value()
def accept(self):
if not self.config.get("_to_be_deleted_"): self.update_internal_config()
super().accept()
def get_config(self): return self.config
# --- AsyncWorker for BLE Operations (Formatted Correctly) ---
class AsyncWorker(QObject):
finished = pyqtSignal()
error = pyqtSignal(str)
ble_device_found_signal = pyqtSignal(str, str)
ble_connection_success_signal = pyqtSignal(str)
ble_connection_failed_signal = pyqtSignal(str)
ble_disconnected_signal = pyqtSignal()
ble_data_received_signal = pyqtSignal(bytes)
ble_unexpected_disconnected_signal = pyqtSignal(str)
def __init__(self):
super().__init__()
self.loop = None
self.thread = None
self.current_task = None
def _run_async_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.loop.run_forever()
finally:
self.loop.close()
def start_event_loop_thread(self):
if self.thread is None or not self.thread.is_alive():
self.thread = threading.Thread(target=self._run_async_loop, daemon=True)
self.thread.start()
def stop_event_loop_thread(self):
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
if self.thread and self.thread.is_alive():
self.thread.join(timeout=1)
if self.thread.is_alive():
print("Warning: Async BLE thread did not terminate cleanly.")
self.thread = None
self.loop = None
async def _scan_ble_devices(self):
try:
devices = await BleakScanner.discover(timeout=5.0)
for device in devices:
name = device.name if device.name else "Unknown Device"
self.ble_device_found_signal.emit(name, device.address)
except Exception as e:
self.error.emit(f"BLE Scan error: {str(e)}")
finally:
self.finished.emit()
def scan_ble_devices(self):
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(self._scan_ble_devices(), self.loop)
else:
self.error.emit("BLE async loop not running.")
async def _connect_ble_device(self, address, client_ref):
def handle_unexpected_disconnect(client_instance_backend):
device_address_str = "未知设备"
try:
if hasattr(client_instance_backend, 'address'):
device_address_str = client_instance_backend.address
print(f"BLEAK_CALLBACK: Device {device_address_str} disconnected unexpectedly.")
except Exception as e_cb:
print(f"BLEAK_CALLBACK: Error in disconnect handler: {e_cb}")
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self.ble_unexpected_disconnected_signal.emit, device_address_str)
else:
self.ble_unexpected_disconnected_signal.emit(device_address_str)
print(f"Async BLE: Connecting to {address}...")
client_ref.clear()
client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
try:
async with BleakClient(address, timeout=15.0) as client:
client_ref['client'] = client
device_name_for_signal = client.name if hasattr(client, 'name') and client.name else client.address
print(f"Async BLE: Context for {client.address} ({device_name_for_signal})")
client.set_disconnected_callback(handle_unexpected_disconnect)
print(f"Async BLE: Disconnected cb set for {client.address}")
await asyncio.sleep(0.5)
print(f"Async BLE: Discovering services for {client.address}...")
try:
await client.get_services()
except Exception as e_gs:
print(f"Async BLE: Warn - Service discovery error: {e_gs}")
print(f"Async BLE: Finding characteristics for {client.address}")
write_char_obj, notify_char_obj = None, None
try:
nus_service = client.services.get_service(UART_SERVICE_UUID)
if nus_service:
write_char_obj = nus_service.get_characteristic(UART_RX_CHAR_UUID)
notify_char_obj = nus_service.get_characteristic(UART_TX_CHAR_UUID)
if write_char_obj and notify_char_obj:
print("Async BLE: Found NUS chars.")
except Exception as e_n:
print(f"Async BLE: NUS lookup: {e_n}")
if not (write_char_obj and notify_char_obj):
print("Async BLE: NUS not full. Searching generic...")
for service in client.services:
for char_in_service in service.characteristics:
if "write" in char_in_service.properties and not write_char_obj:
write_char_obj = char_in_service
if "notify" in char_in_service.properties and not notify_char_obj:
notify_char_obj = char_in_service
if write_char_obj and notify_char_obj:
nus_ok = False
if client.services.get_service(UART_SERVICE_UUID):
nus_s_ = client.services.get_service(UART_SERVICE_UUID)
if nus_s_.get_characteristic(UART_RX_CHAR_UUID) and \
nus_s_.get_characteristic(UART_TX_CHAR_UUID):
nus_ok = True
if not nus_ok:
break
if not write_char_obj:
self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - No writable char")
return
if not notify_char_obj:
self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - No notifiable char")
return
client_ref['write_char_uuid'] = write_char_obj.uuid
client_ref['notify_char_uuid'] = notify_char_obj.uuid
print(f"Async BLE: Using W:{write_char_obj.uuid}, N:{notify_char_obj.uuid}")
try:
def notification_handler_wrapper(sender, data_):
self.ble_data_received_signal.emit(bytes(data_))
await client.start_notify(notify_char_obj.uuid, notification_handler_wrapper)
print(f"Async BLE: Notifications started on {notify_char_obj.uuid}.")
except Exception as e_not:
self.ble_connection_failed_signal.emit(f"BLE: {device_name_for_signal} - Start notify failed: {str(e_not)[:50]}")
return
self.ble_connection_success_signal.emit(device_name_for_signal)
print(f"Async BLE: Connection OK for {client.address}. Keep-alive...")
while client.is_connected:
await asyncio.sleep(0.2)
print(f"Async BLE: Keep-alive exited for {client.address}. Connected: {client.is_connected if client else 'N/A'}.")
except BleakError as e_be:
self.ble_connection_failed_signal.emit(f"BLE Connection Error: {str(e_be)}")
except asyncio.TimeoutError:
self.ble_connection_failed_signal.emit(f"BLE Connection Timeout: {address}")
except Exception as e_gen:
import traceback
traceback.print_exc()
self.ble_connection_failed_signal.emit(f"BLE Unknown Connection Error: {str(e_gen)}")
finally:
print(f"Async BLE: _connect_ble_device finally for {address}.")
client_ref.clear()
client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
if self.loop and self.loop.is_running():
self.finished.emit()
print(f"Async BLE: _connect_ble_device for {address} fully finished (finally).")
def connect_ble_device(self, address, client_ref):
if self.loop and self.loop.is_running():
self.current_task = asyncio.run_coroutine_threadsafe(self._connect_ble_device(address, client_ref), self.loop)
async def _disconnect_ble_device(self, client_ref):
client = client_ref.get('client')
if client and client.is_connected:
try:
if client_ref.get('notify_char_uuid'):
await client.stop_notify(client_ref['notify_char_uuid'])
await client.disconnect()
except Exception as e_d:
self.error.emit(f"BLE Disconnect error: {str(e_d)}")
finally:
client_ref.clear()
client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
self.ble_disconnected_signal.emit()
else:
self.ble_disconnected_signal.emit()
def disconnect_ble_device(self, client_ref):
if self.loop and self.loop.is_running():
if self.current_task and not self.current_task.done():
self.current_task.cancel()
asyncio.run_coroutine_threadsafe(self._disconnect_ble_device(client_ref), self.loop)
else:
client_ref.clear()
client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
self.ble_disconnected_signal.emit()
async def _send_ble_data(self, client_ref, data):
client = client_ref.get('client')
write_uuid = client_ref.get('write_char_uuid')
if client and client.is_connected and write_uuid:
try:
await client.write_gatt_char(write_uuid, data)
except Exception as e_s:
self.error.emit(f"BLE Send error: {str(e_s)}")
elif not client or not client.is_connected:
self.error.emit("BLE: Not connected.")
elif not write_uuid:
self.error.emit("BLE: No write characteristic.")
def send_ble_data(self, client_ref, data):
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(self._send_ble_data(client_ref, data), self.loop)
# --- BLESerialAssistant Class (Main Application Window) ---
# The BLESerialAssistant class definition (from __init__ to the end)
# should be taken from the previous "生成修改后的完整代码" response where its methods
# were already correctly formatted into multiple lines.
# I will re-paste it here for completeness, ensuring all its methods are also multi-line.
class BLESerialAssistant(QMainWindow):
DEFAULT_FONT_SIZE = 9
ORGANIZATION_NAME = "MyPortableApps"
APPLICATION_NAME = "BLESerialAssistant"
SETTINGS_FILENAME = "ble_config.ini"
MAX_CUSTOM_BUTTONS = 12
def __init__(self):
super().__init__()
self.setWindowTitle("PC BLE Serial Assistant")
self.ble_client_ref = {'client': None, 'write_char_uuid': None, 'notify_char_uuid': None}
self.devices_discovered = {}
self.receive_count = 0; self.send_count = 0
self.custom_buttons_config = []; self.custom_button_widgets = []
self.edit_mode_enabled = False; self.long_press_timers = {}
self.selected_button_for_move_source_id = None
self.async_worker = AsyncWorker()
QCoreApplication.setOrganizationName(self.ORGANIZATION_NAME)
QCoreApplication.setApplicationName(self.APPLICATION_NAME)
if getattr(sys, 'frozen', False): application_path = os.path.dirname(sys.executable)
elif __file__: application_path = os.path.dirname(os.path.abspath(__file__))
else: application_path = os.getcwd()
self.settings_file_path = os.path.join(application_path, self.SETTINGS_FILENAME)
self.settings = QSettings(self.settings_file_path, QSettings.IniFormat)
self.init_ui_with_tabs()
self.async_worker.start_event_loop_thread()
self.load_settings()
self.async_worker.ble_device_found_signal.connect(self.add_ble_device_to_list)
self.async_worker.ble_connection_success_signal.connect(self.on_ble_connection_success)
self.async_worker.ble_connection_failed_signal.connect(self.on_ble_connection_failed)
self.async_worker.ble_disconnected_signal.connect(self.on_ble_disconnected)
self.async_worker.ble_unexpected_disconnected_signal.connect(self.on_ble_unexpected_disconnected)
self.async_worker.ble_data_received_signal.connect(self.on_data_received)
self.async_worker.error.connect(self.show_error_message)
self.async_worker.finished.connect(self.on_async_task_finished)
self.update_status("未连接 (Idle)", r=0, s=0)
self.send_timer = QTimer(self)
self.send_timer.timeout.connect(self.send_data_from_ui)
def init_ui_with_tabs(self):
menubar = self.menuBar()
view_menu = menubar.addMenu('视图(&V)')
font_size_menu = QMenu('字体大小', self)
self.font_size_spinbox = QSpinBox()
self.font_size_spinbox.setRange(7, 24)
self.font_size_spinbox.setSuffix(" pt")
self.font_size_spinbox.valueChanged.connect(self.change_font_size_from_spinbox)
font_widget_action = QWidgetAction(self)
font_widget_action.setDefaultWidget(self.font_size_spinbox)
font_size_menu.addAction(font_widget_action)
view_menu.addMenu(font_size_menu)
self.tab_widget = QTabWidget()
self.setCentralWidget(self.tab_widget)
self.ble_operations_tab = QWidget()
self.tab_widget.addTab(self.ble_operations_tab, "蓝牙串口")
ble_ops_main_layout = QHBoxLayout(self.ble_operations_tab)
self.left_splitter = QSplitter(Qt.Vertical)
self.device_list_widget = QListWidget()
self.device_list_widget.itemDoubleClicked.connect(self.connect_selected_device)
self.receive_text_edit = QTextEdit()
self.receive_text_edit.setReadOnly(True)
self.left_v_layout1_label = QLabel("蓝牙列表 (BLE)")
left_v_layout1_content = QWidget()
left_v_layout1 = QVBoxLayout(left_v_layout1_content)
left_v_layout1.addWidget(self.left_v_layout1_label)
left_v_layout1.addWidget(self.device_list_widget)
self.left_v_layout2_label = QLabel("数据接收")
left_v_layout2_content = QWidget()
left_v_layout2 = QVBoxLayout(left_v_layout2_content)
left_v_layout2.addWidget(self.left_v_layout2_label)
left_v_layout2.addWidget(self.receive_text_edit)
self.left_splitter.addWidget(left_v_layout1_content)
self.left_splitter.addWidget(left_v_layout2_content)
self.right_pane_widget_tab1 = QWidget()
right_v_layout_tab1 = QVBoxLayout(self.right_pane_widget_tab1)
self.ble_op_label = QLabel("蓝牙操作 (BLE)")
ble_op_group_widget = QWidget()
ble_op_layout = QVBoxLayout(ble_op_group_widget)
ble_op_layout.addWidget(self.ble_op_label)
self.scan_type_fixed_label = QLabel("扫描蓝牙类型: BLE蓝牙")
scan_type_layout = QHBoxLayout()
scan_type_layout.addWidget(self.scan_type_fixed_label)
ble_op_layout.addLayout(scan_type_layout)
ble_buttons_layout = QHBoxLayout()
self.disconnect_btn = QPushButton("断开蓝牙")
self.disconnect_btn.clicked.connect(self.disconnect_current_device)
self.disconnect_btn.setEnabled(False)
self.scan_btn = QPushButton("扫描蓝牙")
self.scan_btn.clicked.connect(self.start_scan)
ble_buttons_layout.addWidget(self.disconnect_btn)
ble_buttons_layout.addWidget(self.scan_btn)
ble_op_layout.addLayout(ble_buttons_layout)
right_v_layout_tab1.addWidget(ble_op_group_widget)
self.data_proc_label = QLabel("数据处理")
data_proc_group_widget = QWidget()
data_proc_layout = QVBoxLayout(data_proc_group_widget)
data_proc_layout.addWidget(self.data_proc_label)
self.send_newline_checkbox = QCheckBox("发送新行 (CRLF)")
self.recv_clear_on_limit_checkbox = QCheckBox("接收超200KB清屏")
self.hex_display_checkbox = QCheckBox("16进制显示")
self.hex_send_checkbox = QCheckBox("16进制发送")
data_proc_layout.addWidget(self.send_newline_checkbox)
data_proc_layout.addWidget(self.recv_clear_on_limit_checkbox)
h_layout_hex = QHBoxLayout()
h_layout_hex.addWidget(self.hex_display_checkbox)
h_layout_hex.addWidget(self.hex_send_checkbox)
data_proc_layout.addLayout(h_layout_hex)
right_v_layout_tab1.addWidget(data_proc_group_widget)
self.send_op_v_layout_label = QLabel("数据发送")
send_op_group_widget = QWidget()
send_op_v_layout = QVBoxLayout(send_op_group_widget)
send_op_v_layout.addWidget(self.send_op_v_layout_label)
self.send_op_period_label = QLabel("周期:")
send_op_controls_layout = QHBoxLayout()
send_op_controls_layout.addWidget(self.send_op_period_label)
self.period_edit = QLineEdit("500")
self.period_edit.setFixedWidth(60)
self.loop_send_checkbox = QCheckBox("循环发送")
self.loop_send_checkbox.stateChanged.connect(self.toggle_periodic_send)
send_op_controls_layout.addWidget(self.period_edit)
send_op_controls_layout.addWidget(self.loop_send_checkbox)
send_op_controls_layout.addStretch()
send_op_v_layout.addLayout(send_op_controls_layout)
self.send_text_edit = QTextEdit()
self.send_text_edit.setFixedHeight(100)
send_op_v_layout.addWidget(self.send_text_edit)
self.clear_recv_btn = QPushButton("清除接收")
self.clear_recv_btn.clicked.connect(lambda: (self.receive_text_edit.clear(), setattr(self, 'receive_count', 0), self.update_status(self.status_label.text().replace("状态: ",""), r=0)))
self.clear_send_btn = QPushButton("清空发送")
self.clear_send_btn.clicked.connect(self.send_text_edit.clear)
self.send_data_btn = QPushButton("发送数据")
self.send_data_btn.clicked.connect(self.send_data_from_ui)
self.send_data_btn.setEnabled(False)
button_grid_layout = QHBoxLayout()
button_grid_layout.addWidget(self.clear_recv_btn)
button_grid_layout.addWidget(self.clear_send_btn)
send_op_v_layout.addLayout(button_grid_layout)
send_op_v_layout.addWidget(self.send_data_btn)
right_v_layout_tab1.addWidget(send_op_group_widget)
right_v_layout_tab1.addStretch()
self.main_splitter_tab1 = QSplitter(Qt.Horizontal)
self.main_splitter_tab1.addWidget(self.left_splitter)
self.main_splitter_tab1.addWidget(self.right_pane_widget_tab1)
ble_ops_main_layout.addWidget(self.main_splitter_tab1)
self.custom_buttons_tab = QWidget()
self.tab_widget.addTab(self.custom_buttons_tab, "按键控制")
custom_buttons_main_layout = QVBoxLayout(self.custom_buttons_tab)
edit_controls_widget = QWidget()
edit_mode_controls_layout = QHBoxLayout(edit_controls_widget)
self.edit_mode_checkbox = QCheckBox("启用按键编辑模式 (点击按钮选择/移动目标,再次点击已选按钮进行编辑/删除)")
self.edit_mode_checkbox.stateChanged.connect(self.toggle_edit_mode)
edit_mode_controls_layout.addWidget(self.edit_mode_checkbox)
edit_mode_controls_layout.addStretch()
custom_buttons_main_layout.addWidget(edit_controls_widget)
self.custom_buttons_container = QWidget()
self.custom_buttons_layout = QGridLayout(self.custom_buttons_container)
self.custom_buttons_layout.setSpacing(10)
custom_buttons_main_layout.addWidget(self.custom_buttons_container)
custom_buttons_main_layout.addStretch()
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_label = QLabel("状态: 未初始化")
self.rs_label = QLabel("R:0 S:0")
self.version_label = QLabel("V:2.0.2") # Final formatting version
self.status_bar.addWidget(self.status_label, 1)
self.status_bar.addPermanentWidget(self.rs_label)
self.status_bar.addPermanentWidget(self.version_label)
# --- All other methods of BLESerialAssistant (load_settings, save_settings, toggle_edit_mode,
# recreate_custom_buttons_ui, handle_grid_cell_click_edit_mode, add_new_custom_button,
# edit_custom_button, handle_custom_button_pressed/released/timeout, send_custom_data,
# change_font_size, update_status, show_error_message, on_async_task_finished, start_scan,
# add_ble_device_to_list, connect_selected_device, on_ble_connection_success,
# on_ble_connection_failed, on_ble_disconnected, on_ble_unexpected_disconnected,
# _reset_connection_state, disconnect_current_device, send_data_from_ui,
# on_data_received, toggle_periodic_send, closeEvent)
# should be taken from the previous "生成修改后的完整代码" (the one that fixed AttributeError for disconnect_current_device
# AND had all the custom button logic and already corrected standard methods).
# The formatting of these methods was already correct there.
# I will re-paste them here with correct formatting.
def load_settings(self):
print(f"Loading settings from: {self.settings.fileName()}")
geometry = self.settings.value("window/geometry")
if geometry: self.restoreGeometry(geometry)
else: self.setGeometry(100, 100, 980, 780)
main_splitter_state_tab1 = self.settings.value("splitters/mainStateTab1")
if main_splitter_state_tab1: self.main_splitter_tab1.restoreState(main_splitter_state_tab1)
else: self.main_splitter_tab1.setSizes([int(self.width() * 0.55), int(self.width() * 0.45)])
left_splitter_state_tab1 = self.settings.value("splitters/leftStateTab1")
if left_splitter_state_tab1: self.left_splitter.restoreState(left_splitter_state_tab1)
else: self.left_splitter.setSizes([int(self.left_splitter.height() * 0.4), int(self.left_splitter.height() * 0.6)])
font_size = self.settings.value("appearance/fontSize", self.DEFAULT_FONT_SIZE, type=int)
self.font_size_spinbox.setValue(font_size)
self.send_newline_checkbox.setChecked(self.settings.value("options/sendNewline", False, type=bool))
self.recv_clear_on_limit_checkbox.setChecked(self.settings.value("options/recvClearOnLimit", False, type=bool))
self.hex_display_checkbox.setChecked(self.settings.value("options/hexDisplay", False, type=bool))
self.hex_send_checkbox.setChecked(self.settings.value("options/hexSend", False, type=bool))
self.period_edit.setText(self.settings.value("options/periodValue", "500", type=str))
edit_mode_was_enabled = self.settings.value("customButtons/editMode", False, type=bool)
self.edit_mode_checkbox.setChecked(edit_mode_was_enabled)
self.toggle_edit_mode(self.edit_mode_checkbox.isChecked())
self.custom_buttons_config = []
size = self.settings.beginReadArray("customButtonsData")
for i in range(size):
self.settings.setArrayIndex(i)
config = {"id": self.settings.value("id", f"btn_auto_{uuid.uuid4().hex[:8]}"),
"name": self.settings.value("name", f"Button {i+1}"),
"press_data": self.settings.value("press_data", ""),
"press_hex": self.settings.value("press_hex", False, type=bool),
"release_data": self.settings.value("release_data", ""),
"release_hex": self.settings.value("release_hex", False, type=bool),
"mode": self.settings.value("mode", "click"),
"long_press_interval": self.settings.value("long_press_interval", 500, type=int)}
self.custom_buttons_config.append(config)
self.settings.endArray()
self.recreate_custom_buttons_ui()
active_tab_index = self.settings.value("window/activeTab", 0, type=int)
if 0 = len(self.custom_buttons_config): self.custom_buttons_config.append(config_to_move)
else: self.custom_buttons_config.insert(target_flat_index, config_to_move)
self.custom_buttons_config = self.custom_buttons_config[:self.MAX_CUSTOM_BUTTONS]
self.selected_button_for_move_source_id = None
self.recreate_custom_buttons_ui()
elif button_id_at_cell and not is_placeholder:
self.selected_button_for_move_source_id = button_id_at_cell
self.recreate_custom_buttons_ui()
def add_new_custom_button(self):
if not self.edit_mode_enabled: return
if len(self.custom_buttons_config) >= self.MAX_CUSTOM_BUTTONS:
QMessageBox.information(self, "数量限制", f"最多只能添加 {self.MAX_CUSTOM_BUTTONS} 个自定义按钮。"); return
dialog = ButtonConfigDialog(parent=self, is_new_button=True)
if dialog.exec_() == QDialog.Accepted:
new_config = dialog.get_config()
if not new_config.get("name", "").strip(): QMessageBox.warning(self, "输入错误", "按钮名称不能为空!"); return
new_config["id"] = f"custom_{uuid.uuid4().hex[:12]}"; self.custom_buttons_config.append(new_config)
self.selected_button_for_move_source_id = new_config["id"]; self.recreate_custom_buttons_ui()
def edit_custom_button(self, button_id_to_edit):
if not self.edit_mode_enabled: return
config_to_edit, index_to_edit = None, -1
for i, cfg in enumerate(self.custom_buttons_config):
if cfg.get("id") == button_id_to_edit: config_to_edit, index_to_edit = cfg, i; break
if not config_to_edit: QMessageBox.warning(self, "错误", f"内部错误: 未找到按钮 {button_id_to_edit}"); return
dialog = ButtonConfigDialog(button_config=config_to_edit, parent=self, is_new_button=False)
if dialog.exec_() == QDialog.Accepted:
updated_config = dialog.get_config()
if updated_config.get("_to_be_deleted_"):
self.custom_buttons_config.pop(index_to_edit)
if button_id_to_edit in self.long_press_timers:
timer = self.long_press_timers.pop(button_id_to_edit); timer.stop(); timer.deleteLater()
self.selected_button_for_move_source_id = None
elif not updated_config.get("name", "").strip():
QMessageBox.warning(self, "输入错误", "按钮名称不能为空!"); return
else:
updated_config["id"] = button_id_to_edit; self.custom_buttons_config[index_to_edit] = updated_config
self.recreate_custom_buttons_ui()
def handle_custom_button_pressed(self):
if self.edit_mode_enabled: return
sender_button = self.sender();
if not isinstance(sender_button, QPushButton): return
button_id = sender_button.property("button_id");
if not button_id: return
config = next((cfg for cfg in self.custom_buttons_config if cfg.get("id") == button_id), None)
if not config: print(f"Error: Config not found for button ID {button_id}"); return
if config.get("press_data", "").strip(): self.send_custom_data(config.get("press_data"), config.get("press_hex", False))
if config.get("mode") == "long_press":
if button_id in self.long_press_timers and self.long_press_timers[button_id].isActive(): self.long_press_timers[button_id].stop()
timer = QTimer(self); timer.setObjectName(f"timer_{button_id}")
timer.setInterval(config.get("long_press_interval", 500))
timer.timeout.connect(lambda b_id=button_id, p_data=config.get("press_data"), p_hex=config.get("press_hex", False): self.handle_long_press_timeout(b_id, p_data, p_hex))
timer.start(); self.long_press_timers[button_id] = timer
def handle_custom_button_released(self):
if self.edit_mode_enabled: return
sender_button = self.sender()
if not isinstance(sender_button, QPushButton): return
button_id = sender_button.property("button_id")
if not button_id: return
config = next((cfg for cfg in self.custom_buttons_config if cfg.get("id") == button_id), None)
if not config: return
if config.get("mode") == "long_press":
if button_id in self.long_press_timers:
timer = self.long_press_timers[button_id]
if timer.isActive(): timer.stop()
if config.get("release_data", "").strip(): self.send_custom_data(config.get("release_data"), config.get("release_hex", False))
def handle_long_press_timeout(self, button_id, press_data, press_hex):
button_widget = self.findChild(QPushButton, button_id)
if button_widget and not button_widget.isDown():
if button_id in self.long_press_timers: self.long_press_timers[button_id].stop()
return
if press_data.strip(): self.send_custom_data(press_data, press_hex)
def send_custom_data(self, data_str, is_hex):
if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected): print("Custom Send: Device not connected."); return
if not data_str.strip(): return
try:
if is_hex:
clean_hex = "".join(filter(lambda x: x in "0123456789abcdefABCDEF", data_str))
if len(clean_hex) % 2 != 0: self.show_error_message(f"自定义发送错误: 无效的HEX长度 '{data_str[:20]}...'"); return
if not clean_hex and data_str.strip(): self.show_error_message(f"自定义发送错误: 无效的HEX字符 '{data_str[:20]}...'"); return
data_bytes = binascii.unhexlify(clean_hex) if clean_hex else b""
else: data_bytes = data_str.encode('utf-8', errors='replace')
if data_bytes:
self.async_worker.send_ble_data(self.ble_client_ref, data_bytes)
self.send_count += len(data_bytes); self.update_status(self.status_label.text().replace("状态: ",""), s=self.send_count)
except binascii.Error: self.show_error_message(f"自定义发送错误: 无效的HEX数据 '{data_str[:20]}...'")
except Exception as e: self.show_error_message(f"自定义发送错误: {str(e)}")
def change_font_size_from_spinbox(self, size):
self.change_font_size(size)
def change_font_size(self, size):
font = QFont(); font.setPointSize(size); QApplication.setFont(font)
self.receive_text_edit.setFont(font); self.send_text_edit.setFont(font); self.device_list_widget.setFont(font)
self.update()
def update_status(self, message, r=None, s=None):
self.status_label.setText(f"状态: {message}")
if r is not None: self.receive_count = r
if s is not None: self.send_count = s
self.rs_label.setText(f"R:{self.receive_count} S:{self.send_count}")
def show_error_message(self, message):
QMessageBox.critical(self, "错误", message)
current_status_base = self.status_label.text().replace("状态: ", "")
if not ("已连接" in current_status_base or "未连接" in current_status_base or "扫描中" in current_status_base):
self.update_status(f"错误: {message.split(':')[0] if ':' in message else message}")
if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected):
self.scan_btn.setEnabled(True)
def on_async_task_finished(self):
if not (self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected):
self.scan_btn.setEnabled(True)
def start_scan(self):
if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected:
self.show_error_message("请先断开当前连接的设备。"); return
self.device_list_widget.clear(); self.devices_discovered.clear()
self.update_status("扫描中 (BLE)..."); self.scan_btn.setEnabled(False)
self.async_worker.scan_ble_devices()
def add_ble_device_to_list(self, name, address):
display_name = f"{name} ({address})"
self.devices_discovered[display_name] = {"type": "BLE", "address": address, "name": name}
self.device_list_widget.addItem(display_name)
def connect_selected_device(self):
selected_item_widget = self.device_list_widget.currentItem()
if not selected_item_widget: self.show_error_message("请先选择一个设备。"); return
selected_key = selected_item_widget.text()
device_info = self.devices_discovered.get(selected_key)
if not device_info: self.show_error_message("设备信息未找到。"); return
if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected: self.show_error_message("请先断开当前连接。"); return
address = device_info["address"]; self.update_status(f"连接中至 {device_info['name']}...")
self.scan_btn.setEnabled(False); self.disconnect_btn.setEnabled(True)
self.async_worker.connect_ble_device(address, self.ble_client_ref)
def on_ble_connection_success(self, device_name):
self.update_status(f"BLE已连接: {device_name}")
self.send_data_btn.setEnabled(True); self.disconnect_btn.setEnabled(True); self.scan_btn.setEnabled(False)
if "错误" in self.status_label.text().replace("状态: ", ""):
self.update_status(f"BLE已连接: {device_name}", r=self.receive_count, s=self.send_count)
def on_ble_connection_failed(self, error_message):
self.show_error_message(f"BLE连接失败: {error_message}")
self.update_status(f"BLE连接失败"); self.ble_client_ref['client'] = None
self._reset_connection_state()
def on_ble_disconnected(self):
self.update_status("BLE已断开连接"); self.ble_client_ref['client'] = None
self._reset_connection_state()
def on_ble_unexpected_disconnected(self, device_address_str):
print(f"GUI: Unexpected disconnect for {device_address_str}")
if self.ble_client_ref.get('client') is not None or "已连接" in self.status_label.text():
QMessageBox.warning(self, "连接中断", f"与设备 {device_address_str} 的连接意外断开。")
self.update_status(f"连接意外断开: {device_address_str}")
self.ble_client_ref.clear(); self.ble_client_ref.update({'client': None, 'write_char_uuid': None, 'notify_char_uuid': None})
self._reset_connection_state()
def _reset_connection_state(self):
print("GUI: Resetting connection state.")
if self.send_timer.isActive(): self.loop_send_checkbox.setChecked(False)
for tid in list(self.long_press_timers.keys()):
timer = self.long_press_timers.pop(tid)
if timer.isActive(): timer.stop()
timer.deleteLater()
self.send_data_btn.setEnabled(False); self.disconnect_btn.setEnabled(False)
self.scan_btn.setEnabled(True)
def disconnect_current_device(self):
self.update_status("断开连接中...")
if self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected:
self.async_worker.disconnect_ble_device(self.ble_client_ref)
else: self._reset_connection_state(); self.update_status("无活动连接或已断开")
def send_data_from_ui(self):
is_connected = self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected
if not is_connected:
if self.send_timer.isActive(): self.loop_send_checkbox.setChecked(False)
self.show_error_message("设备未连接。"); return
data_str = self.send_text_edit.toPlainText()
if not data_str.strip() and not self.hex_send_checkbox.isChecked(): return
try:
if self.hex_send_checkbox.isChecked():
clean_hex = "".join(filter(lambda x: x in "0123456789abcdefABCDEF", data_str))
if len(clean_hex) % 2 != 0: self.show_error_message("16进制数据长度必须为偶数。"); return
if not clean_hex and data_str.strip(): self.show_error_message("无效的16进制字符。"); return
data_bytes = binascii.unhexlify(clean_hex) if clean_hex else b""
else:
processed_data_str = data_str
if self.send_newline_checkbox.isChecked(): processed_data_str += "\r\n"
data_bytes = processed_data_str.encode('utf-8', errors='replace')
if data_bytes or (self.hex_send_checkbox.isChecked() and not data_str.strip()):
self.async_worker.send_ble_data(self.ble_client_ref, data_bytes)
self.send_count += len(data_bytes)
self.update_status(self.status_label.text().replace("状态: ",""), s=self.send_count)
except binascii.Error: self.show_error_message("无效的16进制字符。")
except Exception as e: self.show_error_message(f"发送准备错误: {str(e)}")
def on_data_received(self, data_bytes):
if self.recv_clear_on_limit_checkbox.isChecked():
if (len(self.receive_text_edit.toPlainText().encode('utf-8')) + len(data_bytes)) > 200 * 1024:
self.receive_text_edit.clear(); self.receive_count = 0
self.update_status(self.status_label.text().replace("状态: ",""), r=self.receive_count)
current_display_text = ""
if self.hex_display_checkbox.isChecked():
hex_text = binascii.hexlify(data_bytes).decode('ascii').upper()
current_display_text = ' '.join(hex_text[i:i+2] for i in range(0, len(hex_text), 2))
if current_display_text: current_display_text += " "
else:
try: current_display_text = data_bytes.decode('utf-8', errors='replace')
except UnicodeDecodeError: current_display_text = data_bytes.decode('latin-1', errors='replace')
if current_display_text or data_bytes:
self.receive_text_edit.moveCursor(QTextCursor.End)
self.receive_text_edit.insertPlainText(current_display_text + "\n")
self.receive_text_edit.ensureCursorVisible()
self.receive_count += len(data_bytes)
self.update_status(self.status_label.text().replace("状态: ",""), r=self.receive_count)
def toggle_periodic_send(self, state_int_or_bool):
state = bool(state_int_or_bool)
is_connected = self.ble_client_ref.get('client') and self.ble_client_ref['client'].is_connected
if state:
if not is_connected:
self.show_error_message("请先连接设备再启动循环发送。"); self.loop_send_checkbox.setChecked(False); return
try:
period_ms = int(self.period_edit.text())
if period_ms