原理:通过python的selenium库登录并获取来的消息,然后进行分发处理执行配置好的脚本
功能:
登录文件传输助手
配置字符串和执行脚本的映射关系
日志记录和查看
环境
python3.11.0
Google Chrome浏览器 版本 140.0.7339.186
chromedriver.exe 需要跟Chrome浏览器匹配
chromedriver下载地址https://registry.npmmirror.com/binary.html?path=chrome-for-testing/

主界面1.png (81.11 KB, 下载次数: 0)
下载附件
2025-9-21 16:39 上传

主界面2.png (63.18 KB, 下载次数: 0)
下载附件
2025-9-21 16:39 上传

主界面3.png (169.71 KB, 下载次数: 0)
下载附件
2025-9-21 16:34 上传
源码
[Python] 纯文本查看 复制代码import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import os
import json
import subprocess
import threading
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
import time
from PIL import Image, ImageTk
import io
import requests
class WeChatHelperGUI:
def __init__(self, root):
self.root = root
self.root.title("微信文件传输助手 - 指令执行工具")
self.root.geometry("800x600")
# 设置中文字体
self.style = ttk.Style()
self.style.configure("TLabel", font=('SimHei', 10))
self.style.configure("TButton", font=('SimHei', 10))
self.style.configure("TEntry", font=('SimHei', 10))
self.style.configure("TText", font=('SimHei', 10))
# 初始化变量
self.driver = None
self.is_logged_in = False
self.config_path = self.get_config_path()
self.last_message = None
self.qr_refresh_timer = None
self.qr_label = None
# 创建界面
self.create_widgets()
# 加载配置
self.load_config()
def get_config_path(self):
"""获取配置文件路径"""
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "config.json")
# 如果配置文件不存在,创建空配置文件
if not os.path.exists(config_path):
with open(config_path, 'w', encoding='utf-8') as f:
json.dump({}, f, ensure_ascii=False, indent=2)
return config_path
def create_widgets(self):
"""创建界面组件"""
# 创建标签页控件
tab_control = ttk.Notebook(self.root)
# 登录标签页
login_tab = ttk.Frame(tab_control)
tab_control.add(login_tab, text="登录")
# 配置标签页
config_tab = ttk.Frame(tab_control)
tab_control.add(config_tab, text="指令配置")
# 日志标签页
log_tab = ttk.Frame(tab_control)
tab_control.add(log_tab, text="日志")
# 放置标签页控件
tab_control.pack(expand=1, fill="both")
# 设置登录标签页
self.setup_login_tab(login_tab)
# 设置配置标签页
self.setup_config_tab(config_tab)
# 设置日志标签页
self.setup_log_tab(log_tab)
def setup_login_tab(self, parent):
"""设置登录标签页"""
frame = ttk.LabelFrame(parent, text="微信文件传输助手登录")
frame.pack(fill="both", expand=True, padx=10, pady=10)
# 配置ChromeDriver路径
ttk.Label(frame, text="ChromeDriver路径:").grid(row=0, column=0, sticky="w", padx=10, pady=10)
self.driver_path_var = tk.StringVar(value="D://360Downloads//chromedriver-win64//chromedriver.exe")
ttk.Entry(frame, textvariable=self.driver_path_var, width=50).grid(row=0, column=1, padx=10, pady=10)
# 无头模式选项
self.headless_var = tk.BooleanVar(value=False)
ttk.Checkbutton(frame, text="无头模式", variable=self.headless_var).grid(row=1, column=0, sticky="w", padx=10, pady=10)
# 登录状态标签
self.login_status_var = tk.StringVar(value="未登录")
ttk.Label(frame, textvariable=self.login_status_var, foreground="red").grid(row=1, column=1, sticky="w", padx=10, pady=10)
# 按钮区域
button_frame = ttk.Frame(frame)
button_frame.grid(row=2, column=0, columnspan=2, pady=20)
self.login_button = ttk.Button(button_frame, text="登录微信", command=self.start_login)
self.login_button.pack(side="left", padx=10)
self.logout_button = ttk.Button(button_frame, text="退出登录", command=self.logout, state="disabled")
self.logout_button.pack(side="left", padx=10)
# 二维码显示区域
self.qr_frame = ttk.Frame(frame)
self.qr_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=10)
ttk.Label(self.qr_frame, text="请使用微信扫码登录:").pack(pady=(0, 10))
self.qr_label = ttk.Label(self.qr_frame)
self.qr_label.pack()
# 说明文本
ttk.Label(frame, text="说明:点击登录后,会打开微信文件传输助手网页,请使用微信扫码登录。二维码会自动刷新以保持有效性。",
wraplength=600, justify="left").grid(row=4, column=0, columnspan=2, padx=10, pady=10)
def getQrImg(self):
"""获取二维码图片"""
try:
# 通过qrcode-img class获取二维码图片
qr_img = self.driver.find_element(By.CSS_SELECTOR, '[class="qrcode-img"]')
# 获取二维码图片src
qr_img_src = qr_img.get_attribute('src')
return qr_img_src
except Exception as e:
self.log(f"获取二维码时发生错误: {str(e)}")
return None
def setup_config_tab(self, parent):
"""设置配置标签页"""
frame = ttk.LabelFrame(parent, text="指令映射配置")
frame.pack(fill="both", expand=True, padx=10, pady=10)
# 指令输入区域
ttk.Label(frame, text="指令名称:").grid(row=0, column=0, sticky="w", padx=10, pady=10)
self.command_var = tk.StringVar()
ttk.Entry(frame, textvariable=self.command_var, width=20).grid(row=0, column=1, padx=10, pady=10)
ttk.Label(frame, text="(无需#前缀)").grid(row=0, column=2, sticky="w", padx=0, pady=10)
# 命令路径输入区域
ttk.Label(frame, text="执行命令/BAT路径:").grid(row=1, column=0, sticky="w", padx=10, pady=10)
self.action_var = tk.StringVar()
ttk.Entry(frame, textvariable=self.action_var, width=50).grid(row=1, column=1, padx=10, pady=10)
ttk.Button(frame, text="浏览", command=self.browse_file).grid(row=1, column=2, padx=10, pady=10)
# 添加/更新按钮
ttk.Button(frame, text="添加/更新指令", command=self.add_or_update_command).grid(row=2, column=0, columnspan=3, pady=10)
# 分隔线
ttk.Separator(frame, orient="horizontal").grid(row=3, column=0, columnspan=3, sticky="ew", pady=10)
# 已配置指令列表
ttk.Label(frame, text="已配置指令列表:").grid(row=4, column=0, sticky="w", padx=10, pady=10)
# 创建列表视图
columns = ("command", "action")
self.command_tree = ttk.Treeview(frame, columns=columns, show="headings", height=10)
# 定义列
self.command_tree.heading("command", text="指令名称")
self.command_tree.heading("action", text="执行命令/BAT路径")
# 设置列宽
self.command_tree.column("command", width=150)
self.command_tree.column("action", width=450)
# 绑定双击事件
self.command_tree.bind("", self.on_tree_item_double_click)
# 放置列表视图
self.command_tree.grid(row=5, column=0, columnspan=3, padx=10, pady=10, sticky="nsew")
# 添加滚动条
scrollbar = ttk.Scrollbar(frame, orient="vertical", command=self.command_tree.yview)
self.command_tree.configure(yscroll=scrollbar.set)
scrollbar.grid(row=5, column=3, sticky="ns")
# 删除按钮
ttk.Button(frame, text="删除选中指令", command=self.delete_command).grid(row=6, column=0, columnspan=3, pady=10)
# 设置列权重,使列表可以拉伸
frame.grid_columnconfigure(1, weight=1)
frame.grid_rowconfigure(5, weight=1)
def setup_log_tab(self, parent):
"""设置日志标签页"""
frame = ttk.LabelFrame(parent, text="操作日志")
frame.pack(fill="both", expand=True, padx=10, pady=10)
# 创建滚动文本框
self.log_text = scrolledtext.ScrolledText(frame, wrap=tk.WORD, width=80, height=25, font=('SimHei', 10))
self.log_text.pack(fill="both", expand=True, padx=10, pady=10)
self.log_text.config(state="disabled")
# 清空日志按钮
ttk.Button(frame, text="清空日志", command=self.clear_log).pack(pady=10)
def log(self, message):
"""记录日志"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log_message = f"[{timestamp}] {message}\n"
self.log_text.config(state="normal")
self.log_text.insert(tk.END, log_message)
self.log_text.see(tk.END)
self.log_text.config(state="disabled")
print(log_message.strip())
def clear_log(self):
"""清空日志"""
self.log_text.config(state="normal")
self.log_text.delete(1.0, tk.END)
self.log_text.config(state="disabled")
def start_login(self):
"""开始登录过程"""
# 禁用登录按钮
self.login_button.config(state="disabled")
self.log("开始登录微信文件传输助手...")
# 在新线程中执行登录,避免界面卡顿
login_thread = threading.Thread(target=self.login)
login_thread.daemon = True
login_thread.start()
def login(self):
"""登录微信文件传输助手"""
try:
# 设置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--start-maximized')
# 是否使用无头模式
if self.headless_var.get():
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
# 指定ChromeDriver路径
driver_path = self.driver_path_var.get()
if not os.path.exists(driver_path):
self.log(f"错误:ChromeDriver路径不存在: {driver_path}")
messagebox.showerror("错误", f"ChromeDriver路径不存在: {driver_path}")
self.root.after(0, lambda: self.login_button.config(state="normal"))
return
service = Service(driver_path)
# 初始化WebDriver
self.driver = webdriver.Chrome(service=service, options=chrome_options)
# 访问微信文件传输助手
self.driver.get('https://filehelper.weixin.qq.com/')
self.log("已打开微信文件传输助手网页")
# 等待页面加载
self.wait_done()
# 检查登录状态
status = self.check_login_status()
if status:
self.log("检测到已登录状态")
self.update_login_status(True)
else:
self.log("等待用户扫码登录...")
# 显示并定时刷新二维码
self.root.after(0, self.display_and_refresh_qr_code)
# 循环检查登录状态
login_timeout = 180 # 登录超时时间,单位:秒
start_time = time.time()
while not status and (time.time() - start_time 0:
return False
# 检查是否正在确认登录
login_comp = self.driver.find_elements(By.CSS_SELECTOR, '[class="logining__loading"]')
if len(login_comp) > 0:
return False
# 检查是否存在聊天面板元素,表示已登录
try:
chat_panel = self.driver.find_element(By.ID, 'chatPanel')
if chat_panel:
return True
except Exception as e:
pass
# 默认返回未登录
return False
except Exception as e:
self.log(f"检查登录状态时发生错误: {str(e)}")
return False
def update_login_status(self, is_logged_in):
"""更新登录状态UI"""
self.is_logged_in = is_logged_in
if is_logged_in:
self.login_status_var.set("已登录")
self.root.after(0, lambda: self.login_status_var.set("已登录"))
self.root.after(0, lambda: self.login_button.config(state="disabled"))
self.root.after(0, lambda: self.logout_button.config(state="normal"))
else:
self.login_status_var.set("未登录")
self.root.after(0, lambda: self.login_status_var.set("未登录"))
self.root.after(0, lambda: self.login_button.config(state="normal"))
self.root.after(0, lambda: self.logout_button.config(state="disabled"))
def logout(self):
"""退出登录"""
if self.driver:
self.driver.quit()
self.driver = None
self.log("已退出登录")
self.update_login_status(False)
def start_monitoring_messages(self):
"""开始监控消息"""
def monitor_messages():
while self.driver and self.is_logged_in:
try:
# 获取聊天主体
chat_body = self.driver.find_element(By.ID, 'chatBody')
# 检查是否有消息
try:
have_msg = chat_body.find_element(By.CSS_SELECTOR, '.msg-list')
# 获取消息列表
msg_list = have_msg.find_elements(By.CSS_SELECTOR, '.msg-list__item')
if msg_list:
current_last_msg = msg_list[-1]
# 判断是否是新消息
if self.last_message != current_last_msg:
self.last_message = current_last_msg
# 检查消息类型
msg_type_element = current_last_msg.find_element(By.CSS_SELECTOR, '.msg-item__content > div')
msg_type_class = msg_type_element.get_attribute('class')
# 处理文本消息
if 'msg-text' in msg_type_class:
text = msg_type_element.text
self.handle_message(text)
elif 'msg-image' in msg_type_class:
self.log('收到图片消息')
elif 'msg-voice' in msg_type_class:
self.log('收到语音消息')
elif 'msg-emotion' in msg_type_class:
self.log('收到表情消息')
else:
self.log('收到其他类型消息')
except Exception:
# 没有消息,继续等待
pass
# 短暂休眠,避免CPU占用过高
time.sleep(1)
except Exception as e:
self.log(f"监控消息时发生错误: {str(e)}")
# 如果发生错误,可能是页面结构变化或已退出登录,检查登录状态
if self.driver:
try:
new_status = self.check_login_status()
if not new_status:
self.log("检测到已退出登录")
self.root.after(0, lambda: self.update_login_status(False))
break
except:
break
# 在新线程中监控消息
monitor_thread = threading.Thread(target=monitor_messages)
monitor_thread.daemon = True
monitor_thread.start()
def handle_message(self, text):
"""处理接收到的消息"""
if text.startswith('#'):
self.log(f'收到指令消息: {text}')
# 解析指令
command = text[1:].strip() # 去掉指令前缀和空格
self.execute_command(command)
else:
self.log(f'收到普通消息: {text}')
def execute_command(self, command):
"""执行指令"""
try:
# 读取配置
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
if command in config:
action = config[command]
self.log(f'执行指令: {command} -> {action}')
# 执行命令
result = subprocess.run([f'{action}'], shell=True, capture_output=True, text=True)
# 记录执行结果
if result.returncode == 0:
self.log(f'指令执行成功: {result.stdout.strip()}')
else:
self.log(f'指令执行失败: {result.stderr.strip()}')
else:
self.log(f'未知指令: {command}')
except Exception as e:
self.log(f'执行指令时发生错误: {str(e)}')
def load_config(self):
"""加载配置"""
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 清空现有列表
for item in self.command_tree.get_children():
self.command_tree.delete(item)
# 填充列表
for command, action in config.items():
self.command_tree.insert("", tk.END, values=(command, action))
except Exception as e:
self.log(f'加载配置时发生错误: {str(e)}')
def browse_file(self):
"""浏览文件"""
from tkinter import filedialog
file_path = filedialog.askopenfilename(title="选择可执行文件或BAT文件",
filetypes=[("所有文件", "*.*"), ("批处理文件", "*.bat"), ("可执行文件", "*.exe")])
if file_path:
self.action_var.set(file_path)
def add_or_update_command(self):
"""添加或更新指令"""
command = self.command_var.get().strip()
action = self.action_var.get().strip()
if not command:
messagebox.showwarning("警告", "请输入指令名称")
return
if not action:
messagebox.showwarning("警告", "请输入执行命令或BAT路径")
return
try:
# 读取配置
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 更新配置
config[command] = action
# 保存配置
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 刷新列表
self.load_config()
# 清空输入框
self.command_var.set("")
self.action_var.set("")
self.log(f'已添加/更新指令: {command} -> {action}')
messagebox.showinfo("成功", "指令添加/更新成功")
except Exception as e:
self.log(f'添加/更新指令时发生错误: {str(e)}')
messagebox.showerror("错误", f"添加/更新指令时发生错误: {str(e)}")
def on_tree_item_double_click(self, event):
"""双击列表项事件"""
item = self.command_tree.selection()[0]
command = self.command_tree.item(item, "values")[0]
action = self.command_tree.item(item, "values")[1]
# 填充到输入框
self.command_var.set(command)
self.action_var.set(action)
def delete_command(self):
"""删除选中的指令"""
selected_items = self.command_tree.selection()
if not selected_items:
messagebox.showwarning("警告", "请先选择要删除的指令")
return
# 确认删除
if not messagebox.askyesno("确认删除", "确定要删除选中的指令吗?"):
return
try:
# 读取配置
with open(self.config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# 删除选中的指令
for item in selected_items:
command = self.command_tree.item(item, "values")[0]
if command in config:
del config[command]
self.log(f'已删除指令: {command}')
# 保存配置
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 刷新列表
self.load_config()
messagebox.showinfo("成功", "指令删除成功")
except Exception as e:
self.log(f'删除指令时发生错误: {str(e)}')
messagebox.showerror("错误", f"删除指令时发生错误: {str(e)}")
if __name__ == '__main__':
root = tk.Tk()
# 设置中文字体支持
root.option_add("*Font", "SimHei 10")
app = WeChatHelperGUI(root)
root.mainloop()
下载地址
https://wwzp.lanzoum.com/ipkQk36oys0b
密码:52pj
欢迎反馈bug