功能:
检查登录
登录账号
发布视频
查询最新一条视频的数据
基于selenium去模拟操作发布过程
[Python] 纯文本查看 复制代码[mw_shl_code=python,true]from tkinter import ttk, filedialog, messagebox
import os
import sys
import threading
import json
import time
import uuid # 导入uuid模块用于生成唯一ID
import shutil # 导入shutil模块用于删除目录
import tkinter as tk
# 确保中文显示正常
sys.stdout.reconfigure(encoding='utf-8')
# 添加当前目录到Python路径,以便导入其他模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入所需的模块
from checkLogin import checkLogin
from login import Login
from publishVideo import publishVideo
from seeStatus import seePublishListInfo
class DouYinManager:
def __init__(self, root):
self.root = root
self.root.title("抖音账号管理系统")
self.root.geometry("1000x600") # 增加窗口宽度以容纳更多列
# 设置中文字体
self.style = ttk.Style()
if sys.platform == 'win32':
self.style.configure(".", font=("SimHei", 10))
# 存储账号配置
self.accounts = []
# 配置目录根路径
self.profiles_root_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "chrome_profiles")
os.makedirs(self.profiles_root_dir, exist_ok=True)
# 创建UI
self.create_ui()
# 加载已保存的账号配置
self.load_accounts()
def create_ui(self):
# 主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 账号列表框架
accounts_frame = ttk.LabelFrame(main_frame, text="账号列表", padding="10")
accounts_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# 创建Treeview用于显示账号列表(支持多选和显示详细信息)
columns = ("name", "id", "profile_dir", "status")
self.accounts_tree = ttk.Treeview(accounts_frame, columns=columns, show="headings", selectmode="extended")
# 定义列标题和宽度
self.accounts_tree.heading("name", text="账号名称")
self.accounts_tree.heading("id", text="账号ID")
self.accounts_tree.heading("profile_dir", text="配置目录")
self.accounts_tree.heading("status", text="登录状态")
self.accounts_tree.column("name", width=120, anchor=tk.CENTER)
self.accounts_tree.column("id", width=180, anchor=tk.CENTER) # 调整宽度以显示完整ID
self.accounts_tree.column("profile_dir", width=400, anchor=tk.W) # 配置目录使用左对齐
self.accounts_tree.column("status", width=100, anchor=tk.CENTER)
# 添加水平滚动条以适应长路径
hscrollbar = ttk.Scrollbar(accounts_frame, orient=tk.HORIZONTAL, command=self.accounts_tree.xview)
self.accounts_tree.configure(xscroll=hscrollbar.set)
# 添加垂直滚动条
vscrollbar = ttk.Scrollbar(accounts_frame, orient=tk.VERTICAL, command=self.accounts_tree.yview)
self.accounts_tree.configure(yscroll=vscrollbar.set)
# 布局Treeview和滚动条
self.accounts_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
vscrollbar.pack(side=tk.RIGHT, fill=tk.Y)
hscrollbar.pack(side=tk.BOTTOM, fill=tk.X)
# 账号操作按钮
buttons_frame = ttk.Frame(accounts_frame)
buttons_frame.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Button(buttons_frame, text="添加账号", command=self.add_account).pack(pady=(0, 5), fill=tk.X)
ttk.Button(buttons_frame, text="删除账号", command=self.delete_account).pack(pady=(0, 5), fill=tk.X)
ttk.Button(buttons_frame, text="编辑账号", command=self.edit_account).pack(pady=(0, 5), fill=tk.X)
ttk.Button(buttons_frame, text="刷新状态", command=self.refresh_status).pack(fill=tk.X)
# 功能按钮框架
functions_frame = ttk.LabelFrame(main_frame, text="功能操作", padding="10")
functions_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Button(functions_frame, text="检查登录状态", command=self.check_login_status).pack(side=tk.LEFT, padx=5)
ttk.Button(functions_frame, text="登录账号", command=self.login_account).pack(side=tk.LEFT, padx=5)
ttk.Button(functions_frame, text="发布视频", command=self.publish_video).pack(side=tk.LEFT, padx=5)
ttk.Button(functions_frame, text="查看发布列表", command=self.view_publish_list).pack(side=tk.LEFT, padx=5)
# 日志显示区域
log_frame = ttk.LabelFrame(main_frame, text="操作日志", padding="10")
log_frame.pack(fill=tk.BOTH, expand=True)
self.log_text = tk.Text(log_frame, height=10, wrap=tk.WORD)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=scrollbar.set)
def load_accounts(self):
"""加载已保存的账号配置"""
try:
if os.path.exists("accounts.json"):
with open("accounts.json", 'r', encoding='utf-8') as f:
self.accounts = json.load(f)
# 确保每个账号都有登录状态字段
for account in self.accounts:
if 'status' not in account:
account['status'] = "未知"
self.update_accounts_list()
except Exception as e:
self.log(f"加载账号配置失败: {str(e)}")
def save_accounts(self):
"""保存账号配置"""
try:
with open("accounts.json", 'w', encoding='utf-8') as f:
json.dump(self.accounts, f, ensure_ascii=False, indent=4)
except Exception as e:
self.log(f"保存账号配置失败: {str(e)}")
def update_accounts_list(self):
"""更新账号列表显示"""
# 清空现有列表
for item in self.accounts_tree.get_children():
self.accounts_tree.delete(item)
# 添加账号到列表,显示完整的账号ID和配置目录
for account in self.accounts:
self.accounts_tree.insert("", tk.END, values=(
account['name'],
account['id'],
account['profile_dir'],
account['status']
))
def get_selected_accounts(self):
"""获取选中的账号列表"""
selected_items = self.accounts_tree.selection()
if not selected_items:
return []
selected_accounts = []
for item in selected_items:
name = self.accounts_tree.item(item, "values")[0]
# 查找对应的账号对象
for account in self.accounts:
if account['name'] == name:
selected_accounts.append(account)
break
return selected_accounts
def add_account(self):
"""添加新账号,自动创建与唯一ID绑定的配置目录"""
# 创建添加账号对话框
dialog = tk.Toplevel(self.root)
dialog.title("添加账号")
dialog.geometry("400x150")
dialog.transient(self.root)
dialog.grab_set()
# 账号名称
ttk.Label(dialog, text="账号名称: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
name_var = tk.StringVar()
ttk.Entry(dialog, textvariable=name_var, width=30).grid(row=0, column=1, padx=10, pady=10)
def save_account():
name = name_var.get().strip()
if not name:
messagebox.showerror("错误", "账号名称不能为空")
return
# 检查账号名称是否已存在
for account in self.accounts:
if account['name'] == name:
messagebox.showerror("错误", "账号名称已存在")
return
# 生成唯一ID
account_id = str(uuid.uuid4())
# 创建与唯一ID绑定的配置目录
profile_dir = os.path.join(self.profiles_root_dir, account_id)
os.makedirs(profile_dir, exist_ok=True)
# 添加账号(包含登录状态字段)
self.accounts.append({
'id': account_id,
'name': name,
'profile_dir': profile_dir,
'status': "未登录"
})
self.save_accounts()
self.update_accounts_list()
self.log(f"已添加账号 '{name}',ID: {account_id},配置目录: {profile_dir}")
dialog.destroy()
# 按钮
button_frame = ttk.Frame(dialog)
button_frame.grid(row=1, column=0, columnspan=2, pady=10)
ttk.Button(button_frame, text="确定", command=save_account).pack(side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
def delete_account(self):
"""删除选中的账号,同时删除对应的配置目录及其中文件"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个或多个账号")
return
# 确认删除
account_names = ", ".join([account['name'] for account in selected_accounts])
if messagebox.askyesno("确认", f"确定要删除选中的 {len(selected_accounts)} 个账号及其配置目录吗?\n此操作不可恢复!"):
try:
# 删除账号配置和目录
for account in selected_accounts:
profile_dir = account['profile_dir']
account_name = account['name']
account_id = account['id']
# 删除账号配置
self.accounts.remove(account)
# 删除配置目录及其中文件
if os.path.exists(profile_dir):
shutil.rmtree(profile_dir)
self.log(f"已删除账号 '{account_name}' 的配置目录: {profile_dir}")
self.log(f"已删除账号 '{account_name}' (ID: {account_id})")
self.save_accounts()
self.update_accounts_list()
except Exception as e:
self.log(f"删除账号时出错: {str(e)}")
messagebox.showerror("错误", f"删除账号时出错: {str(e)}")
def edit_account(self):
"""编辑选中的账号(仅允许修改名称)"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个账号")
return
if len(selected_accounts) > 1:
messagebox.showinfo("提示", "一次只能编辑一个账号")
return
account = selected_accounts[0]
# 创建编辑账号对话框
dialog = tk.Toplevel(self.root)
dialog.title("编辑账号")
dialog.geometry("400x150")
dialog.transient(self.root)
dialog.grab_set()
# 账号名称
ttk.Label(dialog, text="账号名称: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
name_var = tk.StringVar(value=account['name'])
ttk.Entry(dialog, textvariable=name_var, width=30).grid(row=0, column=1, padx=10, pady=10)
# 显示账号ID(不可编辑)
ttk.Label(dialog, text="账号ID: ").grid(row=1, column=0, sticky=tk.W, padx=10, pady=5)
ttk.Label(dialog, text=account['id']).grid(row=1, column=1, sticky=tk.W, padx=10, pady=5)
def update_account():
name = name_var.get().strip()
if not name:
messagebox.showerror("错误", "账号名称不能为空")
return
# 检查账号名称是否已存在(排除当前账号)
for acc in self.accounts:
if acc['name'] == name and acc != account:
messagebox.showerror("错误", "账号名称已存在")
return
# 更新账号名称
old_name = account['name']
account['name'] = name
self.save_accounts()
self.update_accounts_list()
self.log(f"已更新账号 '{old_name}' 名称为 '{name}'")
dialog.destroy()
# 按钮
button_frame = ttk.Frame(dialog)
button_frame.grid(row=2, column=0, columnspan=2, pady=5)
ttk.Button(button_frame, text="确定", command=update_account).pack(side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
def refresh_status(self):
"""刷新所有账号的登录状态"""
self.log("开始刷新所有账号的登录状态...")
def task():
for account in self.accounts:
try:
# 检查登录状态
result = checkLogin(account['profile_dir'])
if result:
account['status'] = "已登录"
else:
account['status'] = "未登录"
# 在主线程中更新UI
self.root.after(0, self.update_accounts_list)
self.log(f"账号 '{account['name']}' 登录状态: {account['status']}")
except Exception as e:
self.log(f"刷新账号 '{account['name']}' 状态时出错: {str(e)}")
self.save_accounts()
self.log("所有账号状态刷新完成")
self.run_in_thread(task)
def log(self, message):
"""向日志区域添加消息"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
self.log_text.see(tk.END)
def run_in_thread(self, func, *args):
"""在新线程中运行函数,避免UI卡死"""
thread = threading.Thread(target=func, args=args)
thread.daemon = True
thread.start()
def check_login_status(self):
"""检查选中账号的登录状态"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个或多个账号")
return
self.log(f"开始检查 {len(selected_accounts)} 个账号的登录状态...")
def task():
for account in selected_accounts:
try:
result = checkLogin(account['profile_dir'])
# 更新账号状态
if result:
account['status'] = "已登录"
else:
account['status'] = "未登录"
# 在主线程中更新UI
self.root.after(0, self.update_accounts_list)
self.log(f"账号 '{account['name']}' 登录状态检查完成: {account['status']}")
except Exception as e:
self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
self.save_accounts()
self.run_in_thread(task)
def login_account(self):
"""登录选中的账号"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个或多个账号")
return
self.log(f"开始登录 {len(selected_accounts)} 个账号...")
def task():
for account in selected_accounts:
try:
self.log(f"开始登录账号 '{account['name']}'...")
result = Login(account['profile_dir'])
# 更新账号状态
if result:
account['status'] = "已登录"
# 在主线程中更新UI
self.root.after(0, self.update_accounts_list)
self.log(f"账号 '{account['name']}' 登录完成: {account['status'] }")
except Exception as e:
self.log(f"登录账号 '{account['name']}' 时出错: {str(e)}")
self.save_accounts()
self.run_in_thread(task)
def publish_video(self):
"""发布视频到选中的账号(先检查实际登录状态)"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个或多个账号")
return
self.log(f"开始检查 {len(selected_accounts)} 个账号的实际登录状态...")
def task():
# 检查所有选中账号的实际登录状态
logged_in_accounts = []
not_logged_in_accounts = []
for account in selected_accounts:
try:
# 实际调用checkLogin函数检查登录状态
is_logged_in = checkLogin(account['profile_dir'])
if is_logged_in:
logged_in_accounts.append(account)
# 更新账号状态
account['status'] = "已登录"
else:
not_logged_in_accounts.append(account['name'])
account['status'] = "未登录"
# 在主线程中更新UI
self.root.after(0, self.update_accounts_list)
except Exception as e:
self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
not_logged_in_accounts.append(account['name'])
self.save_accounts()
# 如果有未登录的账号,显示提示
if not_logged_in_accounts:
not_logged_in_str = ", ".join(not_logged_in_accounts)
self.log(f"以下账号未登录,无法发布视频:{not_logged_in_str}")
self.root.after(0, lambda: messagebox.showinfo("提示", f"以下账号未登录,请先登录后再发布视频:{not_logged_in_str}"))
return
# 如果所有账号都已登录,创建发布视频对话框
if logged_in_accounts:
self.root.after(0, lambda: self.create_publish_dialog(logged_in_accounts))
self.run_in_thread(task)
def create_publish_dialog(self, logged_in_accounts):
"""创建发布视频对话框"""
dialog = tk.Toplevel(self.root)
dialog.title("发布视频")
dialog.geometry("400x300")
dialog.transient(self.root)
dialog.grab_set()
# 视频文件
ttk.Label(dialog, text="视频文件: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
video_path_var = tk.StringVar()
ttk.Entry(dialog, textvariable=video_path_var, width=25).grid(row=0, column=1, padx=10, pady=10, sticky=tk.W)
def browse_file():
file_path = filedialog.askopenfilename(title="选择视频文件", filetypes=[("视频文件", "*.mp4;*.avi;*.mov")])
if file_path:
video_path_var.set(file_path)
ttk.Button(dialog, text="浏览...", command=browse_file).grid(row=0, column=2, padx=5, pady=10)
# 视频标题
ttk.Label(dialog, text="视频标题: ").grid(row=1, column=0, sticky=tk.W, padx=10, pady=10)
title_var = tk.StringVar()
ttk.Entry(dialog, textvariable=title_var, width=30).grid(row=1, column=1, padx=10, pady=10, columnspan=2, sticky=tk.W)
# 视频简介
ttk.Label(dialog, text="视频简介: ").grid(row=2, column=0, sticky=tk.NW, padx=10, pady=10)
text_box = tk.Text(dialog, width=25, height=5)
text_box.grid(row=2, column=1, padx=10, pady=10, columnspan=2, sticky=tk.W)
def publish():
video_path = video_path_var.get().strip()
title = title_var.get().strip()
jianjie = text_box.get("1.0", tk.END).strip()
if not video_path:
messagebox.showerror("错误", "请选择视频文件")
return
if not os.path.exists(video_path):
messagebox.showerror("错误", "视频文件不存在")
return
if not title:
messagebox.showerror("错误", "请输入视频标题")
return
dialog.destroy()
account_names = ", ".join([account['name'] for account in logged_in_accounts])
self.log(f"开始向 {len(logged_in_accounts)} 个账号发布视频: {account_names}")
def task():
for account in logged_in_accounts:
try:
self.log(f"开始发布视频到账号 '{account['name']}'...")
result = publishVideo(
account['profile_dir'],
jianjie,
title,
video_path
)
self.log(f"向账号 '{account['name']}' 发布视频完成: {result}")
except Exception as e:
self.log(f"向账号 '{account['name']}' 发布视频时出错: {str(e)}")
self.run_in_thread(task)
# 按钮
button_frame = ttk.Frame(dialog)
button_frame.grid(row=3, column=0, columnspan=3, pady=10)
ttk.Button(button_frame, text="发布", command=publish).pack(side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
def view_publish_list(self):
"""查看选中账号的发布列表(先检查实际登录状态)"""
selected_accounts = self.get_selected_accounts()
if not selected_accounts:
messagebox.showinfo("提示", "请先选择一个或多个账号")
return
self.log(f"开始检查 {len(selected_accounts)} 个账号的实际登录状态...")
def task():
# 检查所有选中账号的实际登录状态
logged_in_accounts = []
not_logged_in_accounts = []
for account in selected_accounts:
try:
# 实际调用checkLogin函数检查登录状态
is_logged_in = checkLogin(account['profile_dir'])
if is_logged_in:
logged_in_accounts.append(account)
# 更新账号状态
account['status'] = "已登录"
else:
not_logged_in_accounts.append(account['name'])
account['status'] = "未登录"
# 在主线程中更新UI
self.root.after(0, self.update_accounts_list)
except Exception as e:
self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
not_logged_in_accounts.append(account['name'])
self.save_accounts()
# 如果有未登录的账号,显示提示
if not_logged_in_accounts:
not_logged_in_str = ", ".join(not_logged_in_accounts)
self.log(f"以下账号未登录,无法查看发布列表:{not_logged_in_str}")
self.root.after(0, lambda: messagebox.showinfo("提示", f"以下账号未登录,请先登录后再查看发布列表:{not_logged_in_str}"))
return
# 查看已登录账号的发布列表
if logged_in_accounts:
account_names = ", ".join([account['name'] for account in logged_in_accounts])
self.log(f"开始查看 {len(logged_in_accounts)} 个账号的发布列表: {account_names}")
for account in logged_in_accounts:
try:
self.log(f"开始查看账号 '{account['name']}' 的发布列表...")
# 调用seeStatus.py中的seePublishListInfo函数
result = seePublishListInfo(account['profile_dir'])
self.log(f"查看账号 '{account['name']}' 的发布列表完成:")
#输出最新一条的数据
if result['code'] == 0 and len(result['videos']) > 0:
latest_video = result['videos'][0]
metrics_str = '\n'.join([f" {key}: {value}" for key, value in latest_video['metrics'].items()])
self.log(f"最新视频信息:\n" \
f" 发布状态: {latest_video['status']}\n" \
f" 发布标题: {latest_video['title']}\n" \
f" 发布时间: {latest_video['publish_time']}\n" \
f" 统计数据:\n{metrics_str}")
except Exception as e:
self.log(f"查看账号 '{account['name']}' 的发布列表时出错: {str(e)}")
self.run_in_thread(task)
if __name__ == "__main__":
root = tk.Tk()
app = DouYinManager(root)
root.mainloop()[/mw_shl_code]
环境
python3.11.0
Google Chrome浏览器 版本 140.0.7339.186
chromedriver.exe 需要跟Chrome浏览器匹配
启动方法
py douyin_manager.py

主界面.png (94.36 KB, 下载次数: 0)
下载附件
2025-9-21 10:50 上传
下载链接 https://wwzp.lanzoum.com/i4DPD36o6a2h
密码:52pj
欢迎反馈bug