485-Python读写工具

查看 23|回复 1
作者:hjy52   
[Python] 纯文本查看 复制代码import struct
import time
import tkinter as tk
from tkinter import ttk, messagebox, StringVar
import serial
import re
class TouchscreenInterface:
    def __init__(self, root):
        self.root = root
        self.root.title("Modbus通信测试工具_Mango")
        self.root.geometry("950x750")  # 调整窗口大小以适应新功能
        self.root.resizable(True, True)
        # 串口连接状态
        self.connected = False
        self.serial_port = None
        # 创建6组寄存器变量
        self.register_addresses = [StringVar() for _ in range(6)]
        self.data_types = [StringVar(value="整数") for _ in range(6)]
        self.read_results = [StringVar() for _ in range(6)]
        self.write_datas = [StringVar() for _ in range(6)]
        # 定时任务相关变量
        self.timer_registers = [StringVar(), StringVar()]
        self.timer_start_values = [StringVar(), StringVar()]
        self.timer_steps = [StringVar(value="1"), StringVar(value="1")]
        self.timer_intervals = [StringVar(value="5"), StringVar(value="5")]
        self.timer_running = [False, False]
        self.timer_ids = [None, None]
        self.timer_current_values = [{"prefix": "", "number": 0, "length": 0},
                                     {"prefix": "", "number": 0, "length": 0}]
        # 创建界面
        self.create_widgets()
    def create_widgets(self):
        # 创建主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        # 创建连接设置框架
        connection_frame = ttk.LabelFrame(main_frame, text="连接设置", padding="10")
        connection_frame.pack(fill=tk.X, padx=5, pady=5)
        # 串口设置
        ttk.Label(connection_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        self.com_port = StringVar(value="COM3")
        ttk.Entry(connection_frame, textvariable=self.com_port, width=10).grid(row=0, column=1, sticky=tk.W, padx=5,
                                                                               pady=5)
        # 波特率设置
        ttk.Label(connection_frame, text="波特率:").grid(row=0, column=2, sticky=tk.W, padx=5, pady=5)
        self.baud_rate = StringVar(value="38400")
        ttk.Entry(connection_frame, textvariable=self.baud_rate, width=10).grid(row=0, column=3, sticky=tk.W, padx=5,
                                                                                pady=5)
        # 485地址设置
        ttk.Label(connection_frame, text="485地址:").grid(row=0, column=4, sticky=tk.W, padx=5, pady=5)
        self.device_address = StringVar(value="1")
        ttk.Entry(connection_frame, textvariable=self.device_address, width=10).grid(row=0, column=5, sticky=tk.W,
                                                                                     padx=5, pady=5)
        # 连接按钮
        self.connect_button = ttk.Button(connection_frame, text="连接", command=self.toggle_connection)
        self.connect_button.grid(row=0, column=6, padx=10, pady=5)
        # 创建定时任务框架
        timer_frame = ttk.LabelFrame(main_frame, text="定时循环写入(支持自动递增)", padding="10")
        timer_frame.pack(fill=tk.X, padx=5, pady=5)
        # 定时任务1
        ttk.Label(timer_frame, text="寄存器地址:").grid(row=0, column=0, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_registers[0], width=10).grid(row=0, column=1, padx=5, pady=2)
        ttk.Label(timer_frame, text="起始值:").grid(row=0, column=2, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_start_values[0], width=15).grid(row=0, column=3, padx=5, pady=2)
        ttk.Label(timer_frame, text="步长:").grid(row=0, column=4, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_steps[0], width=5).grid(row=0, column=5, padx=5, pady=2)
        ttk.Label(timer_frame, text="间隔(秒):").grid(row=0, column=6, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_intervals[0], width=5).grid(row=0, column=7, padx=5, pady=2)
        self.timer_start_btn1 = ttk.Button(timer_frame, text="启动",
                                           command=lambda: self.start_timer(0))
        self.timer_start_btn1.grid(row=0, column=8, padx=5, pady=2)
        ttk.Button(timer_frame, text="停止",
                   command=lambda: self.stop_timer(0)).grid(row=0, column=9, padx=5, pady=2)
        # 定时任务2
        ttk.Label(timer_frame, text="寄存器地址:").grid(row=1, column=0, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_registers[1], width=10).grid(row=1, column=1, padx=5, pady=2)
        ttk.Label(timer_frame, text="起始值:").grid(row=1, column=2, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_start_values[1], width=15).grid(row=1, column=3, padx=5, pady=2)
        ttk.Label(timer_frame, text="步长:").grid(row=1, column=4, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_steps[1], width=5).grid(row=1, column=5, padx=5, pady=2)
        ttk.Label(timer_frame, text="间隔(秒):").grid(row=1, column=6, padx=5, pady=2)
        ttk.Entry(timer_frame, textvariable=self.timer_intervals[1], width=5).grid(row=1, column=7, padx=5, pady=2)
        self.timer_start_btn2 = ttk.Button(timer_frame, text="启动",
                                           command=lambda: self.start_timer(1))
        self.timer_start_btn2.grid(row=1, column=8, padx=5, pady=2)
        ttk.Button(timer_frame, text="停止",
                   command=lambda: self.stop_timer(1)).grid(row=1, column=9, padx=5, pady=2)
        # 创建数据操作框架
        data_frame = ttk.LabelFrame(main_frame, text="数据操作(最多支持6个寄存器同时操作)", padding="10")
        data_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        # 创建表格标题
        ttk.Label(data_frame, text="寄存器地址(十进制)").grid(row=0, column=0, padx=5)
        ttk.Label(data_frame, text="数据类型").grid(row=0, column=1, padx=5)
        ttk.Label(data_frame, text="读取结果").grid(row=0, column=2, padx=5)
        ttk.Label(data_frame, text="写入数据").grid(row=0, column=3, padx=5)
        ttk.Label(data_frame, text="操作").grid(row=0, column=4, columnspan=2, padx=5)
        # 创建6行寄存器配置
        for i in range(6):
            # 寄存器地址
            ttk.Entry(data_frame, textvariable=self.register_addresses[i], width=12).grid(
                row=i + 1, column=0, padx=5, pady=2)
            # 数据类型
            data_type_combo = ttk.Combobox(data_frame, textvariable=self.data_types[i],
                                           values=["整数", "小数", "字符串"], state="readonly", width=8)
            data_type_combo.grid(row=i + 1, column=1, padx=5, pady=2)
            # 读取结果
            ttk.Label(data_frame, textvariable=self.read_results[i], width=18, relief="sunken").grid(
                row=i + 1, column=2, padx=5, pady=2)
            # 写入数据
            ttk.Entry(data_frame, textvariable=self.write_datas[i], width=22).grid(
                row=i + 1, column=3, padx=5, pady=2)
            # 单个操作按钮
            ttk.Button(data_frame, text="读取", width=6,
                       command=lambda idx=i: self.read_single_register(idx)).grid(
                row=i + 1, column=4, padx=2, pady=2)
            ttk.Button(data_frame, text="写入", width=6,
                       command=lambda idx=i: self.write_single_register(idx)).grid(
                row=i + 1, column=5, padx=2, pady=2)
        # 批量操作按钮
        ttk.Button(data_frame, text="批量读取", command=self.batch_read_data).grid(
            row=7, column=0, columnspan=2, pady=10, sticky=tk.EW)
        ttk.Button(data_frame, text="批量写入", command=self.batch_write_data).grid(
            row=7, column=2, columnspan=2, pady=10, sticky=tk.EW)
        # 创建日志框架
        log_frame = ttk.LabelFrame(main_frame, text="通信日志", padding="10")
        log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        # 日志文本区域
        self.log_text = tk.Text(log_frame, height=15, width=100, wrap=tk.WORD)
        self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
        # 滚动条
        scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        self.log_text.config(yscrollcommand=scrollbar.set)
        # 清除日志按钮
        ttk.Button(log_frame, text="清除日志", command=lambda: self.log_text.delete(1.0, tk.END)).pack(side=tk.BOTTOM,
                                                                                                       pady=5)
        # 初始化日志
        self.log("程序已启动,请配置连接参数并点击连接按钮")
    def toggle_connection(self):
        if not self.connected:
            self.connect()
        else:
            self.disconnect()
    def connect(self):
        try:
            com_port = self.com_port.get()
            baud_rate = int(self.baud_rate.get())
            self.serial_port = serial.Serial(
                port=com_port,
                baudrate=baud_rate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=1
            )
            if self.serial_port.is_open:
                self.connected = True
                self.connect_button.config(text="断开")
                self.log(f"成功连接到 {com_port},波特率: {baud_rate}")
            else:
                self.log("无法打开串口连接")
        except Exception as e:
            self.log(f"连接错误: {str(e)}")
            messagebox.showerror("连接错误", str(e))
    def disconnect(self):
        if self.serial_port and self.serial_port.is_open:
            self.serial_port.close()
        self.connected = False
        self.connect_button.config(text="连接")
        self.log("已断开连接")
    def log(self, message):
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
        self.log_text.see(tk.END)
    def calculate_crc16(self, data):
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc = crc >> 1
        return bytes([crc & 0xFF, crc >> 8])
    def batch_read_data(self):
        for i in range(6):
            if self.register_addresses[i].get():
                self.read_single_register(i)
    def read_single_register(self, index):
        if not self.connected:
            messagebox.showwarning("警告", "请先连接设备")
            return
        try:
            register_address = int(self.register_addresses[index].get())
            device_address = int(self.device_address.get())
            data_type = self.data_types[index].get()
            if data_type == "整数":
                register_count = 1
            elif data_type == "小数":
                register_count = 2
            elif data_type == "字符串":
                register_count = 10
            command = bytes([
                device_address,
                0x03,
                register_address >> 8,
                register_address & 0xFF,
                0x00,
                register_count,
            ])
            command += self.calculate_crc16(command)
            self.serial_port.write(command)
            self.log(f"发送读取命令(寄存器{index + 1}): {command.hex(' ').upper()}")
            time.sleep(0.1)
            if self.serial_port.in_waiting:
                response = self.serial_port.read(self.serial_port.in_waiting)
                self.log(f"收到响应(寄存器{index + 1}): {response.hex(' ').upper()}")
                if len(response) >= 5:
                    if response[0] == device_address and response[1] == 0x03:
                        byte_count = response[2]
                        data_bytes = response[3:3 + byte_count]
                        if data_type == "整数":
                            if len(data_bytes) >= 2:
                                value = (data_bytes[0] = 4:
                                value = struct.unpack('>f', data_bytes)[0]
                                self.read_results[index].set(f"{value:.6f}")
                                self.log(f"寄存器{index + 1}读取到浮点数值: {value:.6f}")
                        elif data_type == "字符串":
                            adjusted_bytes = bytearray()
                            for i in range(0, len(data_bytes), 2):
                                chunk = data_bytes[i:i + 2]
                                if len(chunk) == 2:
                                    adjusted_bytes.extend(reversed(chunk))
                                else:
                                    adjusted_bytes.extend(chunk)
                            text = adjusted_bytes.decode('ascii', errors='replace').rstrip('\x00')
                            self.read_results[index].set(text)
                            self.log(f"寄存器{index + 1}读取到字符串: {text}")
                    else:
                        self.log(f"寄存器{index + 1}响应中的设备地址或功能码不匹配")
                else:
                    self.log(f"寄存器{index + 1}响应数据长度不足")
            else:
                self.log(f"寄存器{index + 1}未收到响应")
        except Exception as e:
            self.log(f"寄存器{index + 1}读取数据错误: {str(e)}")
            messagebox.showerror("读取错误", str(e))
    def batch_write_data(self):
        for i in range(6):
            if self.register_addresses[i].get() and self.write_datas[i].get():
                self.write_single_register(i)
    def write_single_register(self, index):
        if not self.connected:
            messagebox.showwarning("警告", "请先连接设备")
            return
        try:
            register_address = int(self.register_addresses[index].get())
            device_address = int(self.device_address.get())
            data_type = self.data_types[index].get()
            data_to_write = self.write_datas[index].get()
            if data_type == "整数":
                try:
                    value = int(data_to_write)
                    command = bytes([
                        device_address,
                        0x06,
                        register_address >> 8,
                        register_address & 0xFF,
                        value >> 8,
                        value & 0xFF,
                    ])
                except ValueError:
                    messagebox.showerror("输入错误", f"寄存器{index + 1}请输入有效的整数")
                    return
            elif data_type == "小数":
                try:
                    value = float(data_to_write)
                    float_bytes = struct.pack('>f', value)
                    command = bytes([
                        device_address,
                        0x10,
                        register_address >> 8,
                        register_address & 0xFF,
                        0x00, 0x02,
                        0x04,
                    ]) + float_bytes
                except ValueError:
                    messagebox.showerror("输入错误", f"寄存器{index + 1}请输入有效的浮点数")
                    return
            elif data_type == "字符串":
                text_bytes = data_to_write.encode('ascii', errors='replace')
                if len(text_bytes) % 2 != 0:
                    text_bytes += b'\x00'
                adjusted_bytes = bytearray()
                for i in range(0, len(text_bytes), 2):
                    chunk = text_bytes[i:i + 2]
                    if len(chunk) == 2:
                        adjusted_bytes.extend(reversed(chunk))
                    else:
                        adjusted_bytes.extend(chunk)
                register_count = len(adjusted_bytes) // 2
                if register_count > 10:
                    adjusted_bytes = adjusted_bytes[:20]
                    register_count = 10
                command = bytes([
                    device_address,
                    0x10,
                    register_address >> 8,
                    register_address & 0xFF,
                    register_count >> 8,
                    register_count & 0xFF,
                    len(adjusted_bytes),
                ]) + adjusted_bytes
            command += self.calculate_crc16(command)
            self.serial_port.write(command)
            self.log(f"发送写入命令(寄存器{index + 1}): {command.hex(' ').upper()}")
            time.sleep(0.1)
            if self.serial_port.in_waiting:
                response = self.serial_port.read(self.serial_port.in_waiting)
                self.log(f"收到响应(寄存器{index + 1}): {response.hex(' ').upper()}")
                if len(response) >= 8:
                    if response[0] == device_address and (response[1] == 0x06 or response[1] == 0x10):
                        self.log(f"寄存器{index + 1}数据写入成功")
                    else:
                        self.log(f"寄存器{index + 1}响应中的设备地址或功能码不匹配")
                else:
                    self.log(f"寄存器{index + 1}响应数据长度不足")
            else:
                self.log(f"寄存器{index + 1}未收到响应")
        except Exception as e:
            self.log(f"寄存器{index + 1}写入数据错误: {str(e)}")
            messagebox.showerror("写入错误", str(e))
    def start_timer(self, index):
        if self.timer_running[index]:
            return
        # 获取输入参数
        reg_addr = self.timer_registers[index].get()
        start_value = self.timer_start_values[index].get()
        step = self.timer_steps[index].get()
        interval = self.timer_intervals[index].get()
        # 验证输入
        if not reg_addr or not start_value or not step or not interval:
            messagebox.showwarning("输入错误", "请填写所有定时任务参数")
            return
        try:
            reg_addr = int(reg_addr)
            step = int(step)
            interval = float(interval)
        except ValueError:
            messagebox.showwarning("输入错误", "参数格式错误")
            return
        # 解析起始值中的数字部分
        parts = self.parse_increment_value(start_value)
        if not parts:
            messagebox.showwarning("错误", "起始值中未找到数字部分")
            return
        prefix, init_number, num_length = parts
        # 保存当前状态
        self.timer_current_values[index] = {
            "prefix": prefix,
            "number": init_number,
            "length": num_length,
            "step": step,
            "reg_addr": reg_addr
        }
        self.timer_running[index] = True
        self.log(f"定时任务{index + 1} 已启动,起始值: {start_value}")
        self.schedule_write(index, interval)
    def parse_increment_value(self, s):
        """解析字符串中的数字部分,返回(前缀,数字,数字长度)"""
        match = re.search(r'\d+$', s)
        if not match:
            return None
        number_str = match.group()
        return (
            s[:match.start()],  # 前缀
            int(number_str),  # 数字值
            len(number_str)  # 数字长度
        )
    def schedule_write(self, index, interval):
        if not self.timer_running[index]:
            return
        try:
            current = self.timer_current_values[index]
            # 生成新值
            new_number = current["number"] + current["step"]
            new_value = f"{current['prefix']}{new_number:0{int(current['length'])}d}"
            # 写入寄存器
            self.write_timer_data(index, current["reg_addr"], new_value)
            # 更新当前值
            self.timer_current_values[index]["number"] = new_number
            # 调度下一次写入
            self.timer_ids[index] = self.root.after(int(interval * 1000),
                                                    lambda: self.schedule_write(index, interval))
        except Exception as e:
            self.log(f"定时任务{index + 1} 发生错误: {str(e)}")
            self.stop_timer(index)
    def write_timer_data(self, index, reg_addr, value):
        """执行实际的写入操作"""
        if not self.connected:
            self.log("错误:未连接设备")
            self.stop_timer(index)
            return
        try:
            device_address = int(self.device_address.get())
            text_bytes = value.encode('ascii', errors='replace')
            # 处理字节对齐
            if len(text_bytes) % 2 != 0:
                text_bytes += b'\x00'
            # 调整字节顺序
            adjusted_bytes = bytearray()
            for i in range(0, len(text_bytes), 2):
                chunk = text_bytes[i:i + 2]
                adjusted_bytes.extend(reversed(chunk))
            # 构造Modbus命令
            reg_count = len(adjusted_bytes) // 2
            reg_count = min(reg_count, 10)  # 限制最大寄存器数量
            command = bytes([
                device_address,
                0x10,
                reg_addr >> 8,
                reg_addr & 0xFF,
                0x00,
                reg_count,
                len(adjusted_bytes)
            ]) + adjusted_bytes
            command += self.calculate_crc16(command)
            self.serial_port.write(command)
            self.log(f"定时任务{index + 1} 写入 {value} 到 {reg_addr}")
        except Exception as e:
            self.log(f"定时任务{index + 1} 写入失败: {str(e)}")
            raise
    def stop_timer(self, index):
        if self.timer_running[index]:
            self.root.after_cancel(self.timer_ids[index])
            self.timer_running[index] = False
            self.log(f"定时任务{index + 1} 已停止")
if __name__ == "__main__":
    root = tk.Tk()
    app = TouchscreenInterface(root)
    root.mainloop()

寄存器, 定时

guoxiujing   

有TCP网络连接的吗
您需要登录后才可以回帖 登录 | 立即注册

返回顶部