import time
import tkinter as tk
from tkinter import ttk, messagebox, StringVar
import serial
import re
class TouchscreenInterface:
def __init__(self, root):
self.root = root
self.root.title("Modbus通信测试工具_Mango")
self.root.geometry("950x750") # 调整窗口大小以适应新功能
self.root.resizable(True, True)
# 串口连接状态
self.connected = False
self.serial_port = None
# 创建6组寄存器变量
self.register_addresses = [StringVar() for _ in range(6)]
self.data_types = [StringVar(value="整数") for _ in range(6)]
self.read_results = [StringVar() for _ in range(6)]
self.write_datas = [StringVar() for _ in range(6)]
# 定时任务相关变量
self.timer_registers = [StringVar(), StringVar()]
self.timer_start_values = [StringVar(), StringVar()]
self.timer_steps = [StringVar(value="1"), StringVar(value="1")]
self.timer_intervals = [StringVar(value="5"), StringVar(value="5")]
self.timer_running = [False, False]
self.timer_ids = [None, None]
self.timer_current_values = [{"prefix": "", "number": 0, "length": 0},
{"prefix": "", "number": 0, "length": 0}]
# 创建界面
self.create_widgets()
def create_widgets(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 创建连接设置框架
connection_frame = ttk.LabelFrame(main_frame, text="连接设置", padding="10")
connection_frame.pack(fill=tk.X, padx=5, pady=5)
# 串口设置
ttk.Label(connection_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
self.com_port = StringVar(value="COM3")
ttk.Entry(connection_frame, textvariable=self.com_port, width=10).grid(row=0, column=1, sticky=tk.W, padx=5,
pady=5)
# 波特率设置
ttk.Label(connection_frame, text="波特率:").grid(row=0, column=2, sticky=tk.W, padx=5, pady=5)
self.baud_rate = StringVar(value="38400")
ttk.Entry(connection_frame, textvariable=self.baud_rate, width=10).grid(row=0, column=3, sticky=tk.W, padx=5,
pady=5)
# 485地址设置
ttk.Label(connection_frame, text="485地址:").grid(row=0, column=4, sticky=tk.W, padx=5, pady=5)
self.device_address = StringVar(value="1")
ttk.Entry(connection_frame, textvariable=self.device_address, width=10).grid(row=0, column=5, sticky=tk.W,
padx=5, pady=5)
# 连接按钮
self.connect_button = ttk.Button(connection_frame, text="连接", command=self.toggle_connection)
self.connect_button.grid(row=0, column=6, padx=10, pady=5)
# 创建定时任务框架
timer_frame = ttk.LabelFrame(main_frame, text="定时循环写入(支持自动递增)", padding="10")
timer_frame.pack(fill=tk.X, padx=5, pady=5)
# 定时任务1
ttk.Label(timer_frame, text="寄存器地址:").grid(row=0, column=0, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_registers[0], width=10).grid(row=0, column=1, padx=5, pady=2)
ttk.Label(timer_frame, text="起始值:").grid(row=0, column=2, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_start_values[0], width=15).grid(row=0, column=3, padx=5, pady=2)
ttk.Label(timer_frame, text="步长:").grid(row=0, column=4, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_steps[0], width=5).grid(row=0, column=5, padx=5, pady=2)
ttk.Label(timer_frame, text="间隔(秒):").grid(row=0, column=6, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_intervals[0], width=5).grid(row=0, column=7, padx=5, pady=2)
self.timer_start_btn1 = ttk.Button(timer_frame, text="启动",
command=lambda: self.start_timer(0))
self.timer_start_btn1.grid(row=0, column=8, padx=5, pady=2)
ttk.Button(timer_frame, text="停止",
command=lambda: self.stop_timer(0)).grid(row=0, column=9, padx=5, pady=2)
# 定时任务2
ttk.Label(timer_frame, text="寄存器地址:").grid(row=1, column=0, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_registers[1], width=10).grid(row=1, column=1, padx=5, pady=2)
ttk.Label(timer_frame, text="起始值:").grid(row=1, column=2, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_start_values[1], width=15).grid(row=1, column=3, padx=5, pady=2)
ttk.Label(timer_frame, text="步长:").grid(row=1, column=4, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_steps[1], width=5).grid(row=1, column=5, padx=5, pady=2)
ttk.Label(timer_frame, text="间隔(秒):").grid(row=1, column=6, padx=5, pady=2)
ttk.Entry(timer_frame, textvariable=self.timer_intervals[1], width=5).grid(row=1, column=7, padx=5, pady=2)
self.timer_start_btn2 = ttk.Button(timer_frame, text="启动",
command=lambda: self.start_timer(1))
self.timer_start_btn2.grid(row=1, column=8, padx=5, pady=2)
ttk.Button(timer_frame, text="停止",
command=lambda: self.stop_timer(1)).grid(row=1, column=9, padx=5, pady=2)
# 创建数据操作框架
data_frame = ttk.LabelFrame(main_frame, text="数据操作(最多支持6个寄存器同时操作)", padding="10")
data_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 创建表格标题
ttk.Label(data_frame, text="寄存器地址(十进制)").grid(row=0, column=0, padx=5)
ttk.Label(data_frame, text="数据类型").grid(row=0, column=1, padx=5)
ttk.Label(data_frame, text="读取结果").grid(row=0, column=2, padx=5)
ttk.Label(data_frame, text="写入数据").grid(row=0, column=3, padx=5)
ttk.Label(data_frame, text="操作").grid(row=0, column=4, columnspan=2, padx=5)
# 创建6行寄存器配置
for i in range(6):
# 寄存器地址
ttk.Entry(data_frame, textvariable=self.register_addresses[i], width=12).grid(
row=i + 1, column=0, padx=5, pady=2)
# 数据类型
data_type_combo = ttk.Combobox(data_frame, textvariable=self.data_types[i],
values=["整数", "小数", "字符串"], state="readonly", width=8)
data_type_combo.grid(row=i + 1, column=1, padx=5, pady=2)
# 读取结果
ttk.Label(data_frame, textvariable=self.read_results[i], width=18, relief="sunken").grid(
row=i + 1, column=2, padx=5, pady=2)
# 写入数据
ttk.Entry(data_frame, textvariable=self.write_datas[i], width=22).grid(
row=i + 1, column=3, padx=5, pady=2)
# 单个操作按钮
ttk.Button(data_frame, text="读取", width=6,
command=lambda idx=i: self.read_single_register(idx)).grid(
row=i + 1, column=4, padx=2, pady=2)
ttk.Button(data_frame, text="写入", width=6,
command=lambda idx=i: self.write_single_register(idx)).grid(
row=i + 1, column=5, padx=2, pady=2)
# 批量操作按钮
ttk.Button(data_frame, text="批量读取", command=self.batch_read_data).grid(
row=7, column=0, columnspan=2, pady=10, sticky=tk.EW)
ttk.Button(data_frame, text="批量写入", command=self.batch_write_data).grid(
row=7, column=2, columnspan=2, pady=10, sticky=tk.EW)
# 创建日志框架
log_frame = ttk.LabelFrame(main_frame, text="通信日志", padding="10")
log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 日志文本区域
self.log_text = tk.Text(log_frame, height=15, width=100, wrap=tk.WORD)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
# 滚动条
scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=scrollbar.set)
# 清除日志按钮
ttk.Button(log_frame, text="清除日志", command=lambda: self.log_text.delete(1.0, tk.END)).pack(side=tk.BOTTOM,
pady=5)
# 初始化日志
self.log("程序已启动,请配置连接参数并点击连接按钮")
def toggle_connection(self):
if not self.connected:
self.connect()
else:
self.disconnect()
def connect(self):
try:
com_port = self.com_port.get()
baud_rate = int(self.baud_rate.get())
self.serial_port = serial.Serial(
port=com_port,
baudrate=baud_rate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
if self.serial_port.is_open:
self.connected = True
self.connect_button.config(text="断开")
self.log(f"成功连接到 {com_port},波特率: {baud_rate}")
else:
self.log("无法打开串口连接")
except Exception as e:
self.log(f"连接错误: {str(e)}")
messagebox.showerror("连接错误", str(e))
def disconnect(self):
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.connected = False
self.connect_button.config(text="连接")
self.log("已断开连接")
def log(self, message):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
self.log_text.see(tk.END)
def calculate_crc16(self, data):
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
return bytes([crc & 0xFF, crc >> 8])
def batch_read_data(self):
for i in range(6):
if self.register_addresses[i].get():
self.read_single_register(i)
def read_single_register(self, index):
if not self.connected:
messagebox.showwarning("警告", "请先连接设备")
return
try:
register_address = int(self.register_addresses[index].get())
device_address = int(self.device_address.get())
data_type = self.data_types[index].get()
if data_type == "整数":
register_count = 1
elif data_type == "小数":
register_count = 2
elif data_type == "字符串":
register_count = 10
command = bytes([
device_address,
0x03,
register_address >> 8,
register_address & 0xFF,
0x00,
register_count,
])
command += self.calculate_crc16(command)
self.serial_port.write(command)
self.log(f"发送读取命令(寄存器{index + 1}): {command.hex(' ').upper()}")
time.sleep(0.1)
if self.serial_port.in_waiting:
response = self.serial_port.read(self.serial_port.in_waiting)
self.log(f"收到响应(寄存器{index + 1}): {response.hex(' ').upper()}")
if len(response) >= 5:
if response[0] == device_address and response[1] == 0x03:
byte_count = response[2]
data_bytes = response[3:3 + byte_count]
if data_type == "整数":
if len(data_bytes) >= 2:
value = (data_bytes[0] = 4:
value = struct.unpack('>f', data_bytes)[0]
self.read_results[index].set(f"{value:.6f}")
self.log(f"寄存器{index + 1}读取到浮点数值: {value:.6f}")
elif data_type == "字符串":
adjusted_bytes = bytearray()
for i in range(0, len(data_bytes), 2):
chunk = data_bytes[i:i + 2]
if len(chunk) == 2:
adjusted_bytes.extend(reversed(chunk))
else:
adjusted_bytes.extend(chunk)
text = adjusted_bytes.decode('ascii', errors='replace').rstrip('\x00')
self.read_results[index].set(text)
self.log(f"寄存器{index + 1}读取到字符串: {text}")
else:
self.log(f"寄存器{index + 1}响应中的设备地址或功能码不匹配")
else:
self.log(f"寄存器{index + 1}响应数据长度不足")
else:
self.log(f"寄存器{index + 1}未收到响应")
except Exception as e:
self.log(f"寄存器{index + 1}读取数据错误: {str(e)}")
messagebox.showerror("读取错误", str(e))
def batch_write_data(self):
for i in range(6):
if self.register_addresses[i].get() and self.write_datas[i].get():
self.write_single_register(i)
def write_single_register(self, index):
if not self.connected:
messagebox.showwarning("警告", "请先连接设备")
return
try:
register_address = int(self.register_addresses[index].get())
device_address = int(self.device_address.get())
data_type = self.data_types[index].get()
data_to_write = self.write_datas[index].get()
if data_type == "整数":
try:
value = int(data_to_write)
command = bytes([
device_address,
0x06,
register_address >> 8,
register_address & 0xFF,
value >> 8,
value & 0xFF,
])
except ValueError:
messagebox.showerror("输入错误", f"寄存器{index + 1}请输入有效的整数")
return
elif data_type == "小数":
try:
value = float(data_to_write)
float_bytes = struct.pack('>f', value)
command = bytes([
device_address,
0x10,
register_address >> 8,
register_address & 0xFF,
0x00, 0x02,
0x04,
]) + float_bytes
except ValueError:
messagebox.showerror("输入错误", f"寄存器{index + 1}请输入有效的浮点数")
return
elif data_type == "字符串":
text_bytes = data_to_write.encode('ascii', errors='replace')
if len(text_bytes) % 2 != 0:
text_bytes += b'\x00'
adjusted_bytes = bytearray()
for i in range(0, len(text_bytes), 2):
chunk = text_bytes[i:i + 2]
if len(chunk) == 2:
adjusted_bytes.extend(reversed(chunk))
else:
adjusted_bytes.extend(chunk)
register_count = len(adjusted_bytes) // 2
if register_count > 10:
adjusted_bytes = adjusted_bytes[:20]
register_count = 10
command = bytes([
device_address,
0x10,
register_address >> 8,
register_address & 0xFF,
register_count >> 8,
register_count & 0xFF,
len(adjusted_bytes),
]) + adjusted_bytes
command += self.calculate_crc16(command)
self.serial_port.write(command)
self.log(f"发送写入命令(寄存器{index + 1}): {command.hex(' ').upper()}")
time.sleep(0.1)
if self.serial_port.in_waiting:
response = self.serial_port.read(self.serial_port.in_waiting)
self.log(f"收到响应(寄存器{index + 1}): {response.hex(' ').upper()}")
if len(response) >= 8:
if response[0] == device_address and (response[1] == 0x06 or response[1] == 0x10):
self.log(f"寄存器{index + 1}数据写入成功")
else:
self.log(f"寄存器{index + 1}响应中的设备地址或功能码不匹配")
else:
self.log(f"寄存器{index + 1}响应数据长度不足")
else:
self.log(f"寄存器{index + 1}未收到响应")
except Exception as e:
self.log(f"寄存器{index + 1}写入数据错误: {str(e)}")
messagebox.showerror("写入错误", str(e))
def start_timer(self, index):
if self.timer_running[index]:
return
# 获取输入参数
reg_addr = self.timer_registers[index].get()
start_value = self.timer_start_values[index].get()
step = self.timer_steps[index].get()
interval = self.timer_intervals[index].get()
# 验证输入
if not reg_addr or not start_value or not step or not interval:
messagebox.showwarning("输入错误", "请填写所有定时任务参数")
return
try:
reg_addr = int(reg_addr)
step = int(step)
interval = float(interval)
except ValueError:
messagebox.showwarning("输入错误", "参数格式错误")
return
# 解析起始值中的数字部分
parts = self.parse_increment_value(start_value)
if not parts:
messagebox.showwarning("错误", "起始值中未找到数字部分")
return
prefix, init_number, num_length = parts
# 保存当前状态
self.timer_current_values[index] = {
"prefix": prefix,
"number": init_number,
"length": num_length,
"step": step,
"reg_addr": reg_addr
}
self.timer_running[index] = True
self.log(f"定时任务{index + 1} 已启动,起始值: {start_value}")
self.schedule_write(index, interval)
def parse_increment_value(self, s):
"""解析字符串中的数字部分,返回(前缀,数字,数字长度)"""
match = re.search(r'\d+$', s)
if not match:
return None
number_str = match.group()
return (
s[:match.start()], # 前缀
int(number_str), # 数字值
len(number_str) # 数字长度
)
def schedule_write(self, index, interval):
if not self.timer_running[index]:
return
try:
current = self.timer_current_values[index]
# 生成新值
new_number = current["number"] + current["step"]
new_value = f"{current['prefix']}{new_number:0{int(current['length'])}d}"
# 写入寄存器
self.write_timer_data(index, current["reg_addr"], new_value)
# 更新当前值
self.timer_current_values[index]["number"] = new_number
# 调度下一次写入
self.timer_ids[index] = self.root.after(int(interval * 1000),
lambda: self.schedule_write(index, interval))
except Exception as e:
self.log(f"定时任务{index + 1} 发生错误: {str(e)}")
self.stop_timer(index)
def write_timer_data(self, index, reg_addr, value):
"""执行实际的写入操作"""
if not self.connected:
self.log("错误:未连接设备")
self.stop_timer(index)
return
try:
device_address = int(self.device_address.get())
text_bytes = value.encode('ascii', errors='replace')
# 处理字节对齐
if len(text_bytes) % 2 != 0:
text_bytes += b'\x00'
# 调整字节顺序
adjusted_bytes = bytearray()
for i in range(0, len(text_bytes), 2):
chunk = text_bytes[i:i + 2]
adjusted_bytes.extend(reversed(chunk))
# 构造Modbus命令
reg_count = len(adjusted_bytes) // 2
reg_count = min(reg_count, 10) # 限制最大寄存器数量
command = bytes([
device_address,
0x10,
reg_addr >> 8,
reg_addr & 0xFF,
0x00,
reg_count,
len(adjusted_bytes)
]) + adjusted_bytes
command += self.calculate_crc16(command)
self.serial_port.write(command)
self.log(f"定时任务{index + 1} 写入 {value} 到 {reg_addr}")
except Exception as e:
self.log(f"定时任务{index + 1} 写入失败: {str(e)}")
raise
def stop_timer(self, index):
if self.timer_running[index]:
self.root.after_cancel(self.timer_ids[index])
self.timer_running[index] = False
self.log(f"定时任务{index + 1} 已停止")
if __name__ == "__main__":
root = tk.Tk()
app = TouchscreenInterface(root)
root.mainloop()