某音多账号发布工具

查看 19|回复 1
作者:orange0top   
一个某音发布的本地小工具
功能:
检查登录
登录账号
发布视频
查询最新一条视频的数据
基于selenium去模拟操作发布过程
[Python] 纯文本查看 复制代码[mw_shl_code=python,true]from tkinter import ttk, filedialog, messagebox
import os
import sys
import threading
import json
import time
import uuid  # 导入uuid模块用于生成唯一ID
import shutil  # 导入shutil模块用于删除目录
import tkinter as tk
# 确保中文显示正常
sys.stdout.reconfigure(encoding='utf-8')
# 添加当前目录到Python路径,以便导入其他模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入所需的模块
from checkLogin import checkLogin
from login import Login
from publishVideo import publishVideo
from seeStatus import seePublishListInfo
class DouYinManager:
    def __init__(self, root):
        self.root = root
        self.root.title("抖音账号管理系统")
        self.root.geometry("1000x600")  # 增加窗口宽度以容纳更多列
        
        # 设置中文字体
        self.style = ttk.Style()
        if sys.platform == 'win32':
            self.style.configure(".", font=("SimHei", 10))
        
        # 存储账号配置
        self.accounts = []
        # 配置目录根路径
        self.profiles_root_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "chrome_profiles")
        os.makedirs(self.profiles_root_dir, exist_ok=True)
        
        # 创建UI
        self.create_ui()
        
        # 加载已保存的账号配置
        self.load_accounts()
        
    def create_ui(self):
        # 主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        
        # 账号列表框架
        accounts_frame = ttk.LabelFrame(main_frame, text="账号列表", padding="10")
        accounts_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
        
        # 创建Treeview用于显示账号列表(支持多选和显示详细信息)
        columns = ("name", "id", "profile_dir", "status")
        self.accounts_tree = ttk.Treeview(accounts_frame, columns=columns, show="headings", selectmode="extended")
        
        # 定义列标题和宽度
        self.accounts_tree.heading("name", text="账号名称")
        self.accounts_tree.heading("id", text="账号ID")
        self.accounts_tree.heading("profile_dir", text="配置目录")
        self.accounts_tree.heading("status", text="登录状态")
        
        self.accounts_tree.column("name", width=120, anchor=tk.CENTER)
        self.accounts_tree.column("id", width=180, anchor=tk.CENTER)  # 调整宽度以显示完整ID
        self.accounts_tree.column("profile_dir", width=400, anchor=tk.W)  # 配置目录使用左对齐
        self.accounts_tree.column("status", width=100, anchor=tk.CENTER)
        
        # 添加水平滚动条以适应长路径
        hscrollbar = ttk.Scrollbar(accounts_frame, orient=tk.HORIZONTAL, command=self.accounts_tree.xview)
        self.accounts_tree.configure(xscroll=hscrollbar.set)
        
        # 添加垂直滚动条
        vscrollbar = ttk.Scrollbar(accounts_frame, orient=tk.VERTICAL, command=self.accounts_tree.yview)
        self.accounts_tree.configure(yscroll=vscrollbar.set)
        
        # 布局Treeview和滚动条
        self.accounts_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        vscrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        hscrollbar.pack(side=tk.BOTTOM, fill=tk.X)
        
        # 账号操作按钮
        buttons_frame = ttk.Frame(accounts_frame)
        buttons_frame.pack(side=tk.RIGHT, fill=tk.Y)
        
        ttk.Button(buttons_frame, text="添加账号", command=self.add_account).pack(pady=(0, 5), fill=tk.X)
        ttk.Button(buttons_frame, text="删除账号", command=self.delete_account).pack(pady=(0, 5), fill=tk.X)
        ttk.Button(buttons_frame, text="编辑账号", command=self.edit_account).pack(pady=(0, 5), fill=tk.X)
        ttk.Button(buttons_frame, text="刷新状态", command=self.refresh_status).pack(fill=tk.X)
        
        # 功能按钮框架
        functions_frame = ttk.LabelFrame(main_frame, text="功能操作", padding="10")
        functions_frame.pack(fill=tk.X, pady=(0, 10))
        
        ttk.Button(functions_frame, text="检查登录状态", command=self.check_login_status).pack(side=tk.LEFT, padx=5)
        ttk.Button(functions_frame, text="登录账号", command=self.login_account).pack(side=tk.LEFT, padx=5)
        ttk.Button(functions_frame, text="发布视频", command=self.publish_video).pack(side=tk.LEFT, padx=5)
        ttk.Button(functions_frame, text="查看发布列表", command=self.view_publish_list).pack(side=tk.LEFT, padx=5)
        
        # 日志显示区域
        log_frame = ttk.LabelFrame(main_frame, text="操作日志", padding="10")
        log_frame.pack(fill=tk.BOTH, expand=True)
        
        self.log_text = tk.Text(log_frame, height=10, wrap=tk.WORD)
        self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        self.log_text.config(yscrollcommand=scrollbar.set)
        
    def load_accounts(self):
        """加载已保存的账号配置"""
        try:
            if os.path.exists("accounts.json"):
                with open("accounts.json", 'r', encoding='utf-8') as f:
                    self.accounts = json.load(f)
                    # 确保每个账号都有登录状态字段
                    for account in self.accounts:
                        if 'status' not in account:
                            account['status'] = "未知"
                    self.update_accounts_list()
        except Exception as e:
            self.log(f"加载账号配置失败: {str(e)}")
   
    def save_accounts(self):
        """保存账号配置"""
        try:
            with open("accounts.json", 'w', encoding='utf-8') as f:
                json.dump(self.accounts, f, ensure_ascii=False, indent=4)
        except Exception as e:
            self.log(f"保存账号配置失败: {str(e)}")
   
    def update_accounts_list(self):
        """更新账号列表显示"""
        # 清空现有列表
        for item in self.accounts_tree.get_children():
            self.accounts_tree.delete(item)
        
        # 添加账号到列表,显示完整的账号ID和配置目录
        for account in self.accounts:
            self.accounts_tree.insert("", tk.END, values=(
                account['name'],
                account['id'],
                account['profile_dir'],
                account['status']
            ))
   
    def get_selected_accounts(self):
        """获取选中的账号列表"""
        selected_items = self.accounts_tree.selection()
        if not selected_items:
            return []
            
        selected_accounts = []
        for item in selected_items:
            name = self.accounts_tree.item(item, "values")[0]
            # 查找对应的账号对象
            for account in self.accounts:
                if account['name'] == name:
                    selected_accounts.append(account)
                    break
        
        return selected_accounts
   
    def add_account(self):
        """添加新账号,自动创建与唯一ID绑定的配置目录"""
        # 创建添加账号对话框
        dialog = tk.Toplevel(self.root)
        dialog.title("添加账号")
        dialog.geometry("400x150")
        dialog.transient(self.root)
        dialog.grab_set()
        
        # 账号名称
        ttk.Label(dialog, text="账号名称: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
        name_var = tk.StringVar()
        ttk.Entry(dialog, textvariable=name_var, width=30).grid(row=0, column=1, padx=10, pady=10)
        
        def save_account():
            name = name_var.get().strip()
            
            if not name:
                messagebox.showerror("错误", "账号名称不能为空")
                return
            
            # 检查账号名称是否已存在
            for account in self.accounts:
                if account['name'] == name:
                    messagebox.showerror("错误", "账号名称已存在")
                    return
            
            # 生成唯一ID
            account_id = str(uuid.uuid4())
            
            # 创建与唯一ID绑定的配置目录
            profile_dir = os.path.join(self.profiles_root_dir, account_id)
            os.makedirs(profile_dir, exist_ok=True)
            
            # 添加账号(包含登录状态字段)
            self.accounts.append({
                'id': account_id,
                'name': name,
                'profile_dir': profile_dir,
                'status': "未登录"
            })
            self.save_accounts()
            self.update_accounts_list()
            self.log(f"已添加账号 '{name}',ID: {account_id},配置目录: {profile_dir}")
            dialog.destroy()
        
        # 按钮
        button_frame = ttk.Frame(dialog)
        button_frame.grid(row=1, column=0, columnspan=2, pady=10)
        ttk.Button(button_frame, text="确定", command=save_account).pack(side=tk.LEFT, padx=10)
        ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
   
    def delete_account(self):
        """删除选中的账号,同时删除对应的配置目录及其中文件"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个或多个账号")
            return
        
        # 确认删除
        account_names = ", ".join([account['name'] for account in selected_accounts])
        if messagebox.askyesno("确认", f"确定要删除选中的 {len(selected_accounts)} 个账号及其配置目录吗?\n此操作不可恢复!"):
            try:
                # 删除账号配置和目录
                for account in selected_accounts:
                    profile_dir = account['profile_dir']
                    account_name = account['name']
                    account_id = account['id']
                    
                    # 删除账号配置
                    self.accounts.remove(account)
                    
                    # 删除配置目录及其中文件
                    if os.path.exists(profile_dir):
                        shutil.rmtree(profile_dir)
                        self.log(f"已删除账号 '{account_name}' 的配置目录: {profile_dir}")
                    
                    self.log(f"已删除账号 '{account_name}' (ID: {account_id})")
               
                self.save_accounts()
                self.update_accounts_list()
               
            except Exception as e:
                self.log(f"删除账号时出错: {str(e)}")
                messagebox.showerror("错误", f"删除账号时出错: {str(e)}")
   
    def edit_account(self):
        """编辑选中的账号(仅允许修改名称)"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个账号")
            return
        
        if len(selected_accounts) > 1:
            messagebox.showinfo("提示", "一次只能编辑一个账号")
            return
        
        account = selected_accounts[0]
        
        # 创建编辑账号对话框
        dialog = tk.Toplevel(self.root)
        dialog.title("编辑账号")
        dialog.geometry("400x150")
        dialog.transient(self.root)
        dialog.grab_set()
        
        # 账号名称
        ttk.Label(dialog, text="账号名称: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
        name_var = tk.StringVar(value=account['name'])
        ttk.Entry(dialog, textvariable=name_var, width=30).grid(row=0, column=1, padx=10, pady=10)
        
        # 显示账号ID(不可编辑)
        ttk.Label(dialog, text="账号ID: ").grid(row=1, column=0, sticky=tk.W, padx=10, pady=5)
        ttk.Label(dialog, text=account['id']).grid(row=1, column=1, sticky=tk.W, padx=10, pady=5)
        
        def update_account():
            name = name_var.get().strip()
            
            if not name:
                messagebox.showerror("错误", "账号名称不能为空")
                return
            
            # 检查账号名称是否已存在(排除当前账号)
            for acc in self.accounts:
                if acc['name'] == name and acc != account:
                    messagebox.showerror("错误", "账号名称已存在")
                    return
            
            # 更新账号名称
            old_name = account['name']
            account['name'] = name
            self.save_accounts()
            self.update_accounts_list()
            
            self.log(f"已更新账号 '{old_name}' 名称为 '{name}'")
            dialog.destroy()
        
        # 按钮
        button_frame = ttk.Frame(dialog)
        button_frame.grid(row=2, column=0, columnspan=2, pady=5)
        ttk.Button(button_frame, text="确定", command=update_account).pack(side=tk.LEFT, padx=10)
        ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
   
    def refresh_status(self):
        """刷新所有账号的登录状态"""
        self.log("开始刷新所有账号的登录状态...")
        
        def task():
            for account in self.accounts:
                try:
                    # 检查登录状态
                    result = checkLogin(account['profile_dir'])
                    if  result:
                        account['status'] = "已登录"
                    else:
                        account['status'] = "未登录"
                    # 在主线程中更新UI
                    self.root.after(0, self.update_accounts_list)
                    self.log(f"账号 '{account['name']}' 登录状态: {account['status']}")
                except Exception as e:
                    self.log(f"刷新账号 '{account['name']}' 状态时出错: {str(e)}")
            
            self.save_accounts()
            self.log("所有账号状态刷新完成")
        
        self.run_in_thread(task)
   
    def log(self, message):
        """向日志区域添加消息"""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
        self.log_text.see(tk.END)
   
    def run_in_thread(self, func, *args):
        """在新线程中运行函数,避免UI卡死"""
        thread = threading.Thread(target=func, args=args)
        thread.daemon = True
        thread.start()
   
    def check_login_status(self):
        """检查选中账号的登录状态"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个或多个账号")
            return
        
        self.log(f"开始检查 {len(selected_accounts)} 个账号的登录状态...")
        
        def task():
            for account in selected_accounts:
                try:
                    result = checkLogin(account['profile_dir'])
                    # 更新账号状态
                    if  result:
                        account['status'] = "已登录"
                    else:
                        account['status'] = "未登录"
                    # 在主线程中更新UI
                    self.root.after(0, self.update_accounts_list)
                    self.log(f"账号 '{account['name']}' 登录状态检查完成: {account['status']}")
                except Exception as e:
                    self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
            
            self.save_accounts()
        
        self.run_in_thread(task)
   
    def login_account(self):
        """登录选中的账号"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个或多个账号")
            return
        
        self.log(f"开始登录 {len(selected_accounts)} 个账号...")
        
        def task():
            for account in selected_accounts:
                try:
                    self.log(f"开始登录账号 '{account['name']}'...")
                    result = Login(account['profile_dir'])
                    # 更新账号状态
                    if result:
                        account['status'] = "已登录"
                        # 在主线程中更新UI
                        self.root.after(0, self.update_accounts_list)
                    self.log(f"账号 '{account['name']}' 登录完成: {account['status'] }")
                except Exception as e:
                    self.log(f"登录账号 '{account['name']}' 时出错: {str(e)}")
            
            self.save_accounts()
        
        self.run_in_thread(task)
   
    def publish_video(self):
        """发布视频到选中的账号(先检查实际登录状态)"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个或多个账号")
            return
        
        self.log(f"开始检查 {len(selected_accounts)} 个账号的实际登录状态...")
        
        def task():
            # 检查所有选中账号的实际登录状态
            logged_in_accounts = []
            not_logged_in_accounts = []
            
            for account in selected_accounts:
                try:
                    # 实际调用checkLogin函数检查登录状态
                    is_logged_in = checkLogin(account['profile_dir'])
                    if is_logged_in:
                        logged_in_accounts.append(account)
                        # 更新账号状态
                        account['status'] = "已登录"
                    else:
                        not_logged_in_accounts.append(account['name'])
                        account['status'] = "未登录"
                    # 在主线程中更新UI
                    self.root.after(0, self.update_accounts_list)
                except Exception as e:
                    self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
                    not_logged_in_accounts.append(account['name'])
            
            self.save_accounts()
            
            # 如果有未登录的账号,显示提示
            if not_logged_in_accounts:
                not_logged_in_str = ", ".join(not_logged_in_accounts)
                self.log(f"以下账号未登录,无法发布视频:{not_logged_in_str}")
                self.root.after(0, lambda: messagebox.showinfo("提示", f"以下账号未登录,请先登录后再发布视频:{not_logged_in_str}"))
                return
            
            # 如果所有账号都已登录,创建发布视频对话框
            if logged_in_accounts:
                self.root.after(0, lambda: self.create_publish_dialog(logged_in_accounts))
               
        self.run_in_thread(task)
        
    def create_publish_dialog(self, logged_in_accounts):
        """创建发布视频对话框"""
        dialog = tk.Toplevel(self.root)
        dialog.title("发布视频")
        dialog.geometry("400x300")
        dialog.transient(self.root)
        dialog.grab_set()
        
        # 视频文件
        ttk.Label(dialog, text="视频文件: ").grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)
        video_path_var = tk.StringVar()
        ttk.Entry(dialog, textvariable=video_path_var, width=25).grid(row=0, column=1, padx=10, pady=10, sticky=tk.W)
        
        def browse_file():
            file_path = filedialog.askopenfilename(title="选择视频文件", filetypes=[("视频文件", "*.mp4;*.avi;*.mov")])
            if file_path:
                video_path_var.set(file_path)
        
        ttk.Button(dialog, text="浏览...", command=browse_file).grid(row=0, column=2, padx=5, pady=10)
        
        # 视频标题
        ttk.Label(dialog, text="视频标题: ").grid(row=1, column=0, sticky=tk.W, padx=10, pady=10)
        title_var = tk.StringVar()
        ttk.Entry(dialog, textvariable=title_var, width=30).grid(row=1, column=1, padx=10, pady=10, columnspan=2, sticky=tk.W)
        
        # 视频简介
        ttk.Label(dialog, text="视频简介: ").grid(row=2, column=0, sticky=tk.NW, padx=10, pady=10)
        text_box = tk.Text(dialog, width=25, height=5)
        text_box.grid(row=2, column=1, padx=10, pady=10, columnspan=2, sticky=tk.W)
        
        def publish():
            video_path = video_path_var.get().strip()
            title = title_var.get().strip()
            jianjie = text_box.get("1.0", tk.END).strip()
            
            if not video_path:
                messagebox.showerror("错误", "请选择视频文件")
                return
            
            if not os.path.exists(video_path):
                messagebox.showerror("错误", "视频文件不存在")
                return
            
            if not title:
                messagebox.showerror("错误", "请输入视频标题")
                return
            
            dialog.destroy()
            account_names = ", ".join([account['name'] for account in logged_in_accounts])
            self.log(f"开始向 {len(logged_in_accounts)} 个账号发布视频: {account_names}")
            
            def task():
                for account in logged_in_accounts:
                    try:
                        self.log(f"开始发布视频到账号 '{account['name']}'...")
                        result = publishVideo(
                            account['profile_dir'],
                            jianjie,
                            title,
                            video_path
                        )
                        self.log(f"向账号 '{account['name']}' 发布视频完成: {result}")
                    except Exception as e:
                        self.log(f"向账号 '{account['name']}' 发布视频时出错: {str(e)}")
            
            self.run_in_thread(task)
        
        # 按钮
        button_frame = ttk.Frame(dialog)
        button_frame.grid(row=3, column=0, columnspan=3, pady=10)
        ttk.Button(button_frame, text="发布", command=publish).pack(side=tk.LEFT, padx=10)
        ttk.Button(button_frame, text="取消", command=dialog.destroy).pack(side=tk.LEFT, padx=10)
   
    def view_publish_list(self):
        """查看选中账号的发布列表(先检查实际登录状态)"""
        selected_accounts = self.get_selected_accounts()
        if not selected_accounts:
            messagebox.showinfo("提示", "请先选择一个或多个账号")
            return
        
        self.log(f"开始检查 {len(selected_accounts)} 个账号的实际登录状态...")
        
        def task():
            # 检查所有选中账号的实际登录状态
            logged_in_accounts = []
            not_logged_in_accounts = []
            
            for account in selected_accounts:
                try:
                    # 实际调用checkLogin函数检查登录状态
                    is_logged_in = checkLogin(account['profile_dir'])
                    if is_logged_in:
                        logged_in_accounts.append(account)
                        # 更新账号状态
                        account['status'] = "已登录"
                    else:
                        not_logged_in_accounts.append(account['name'])
                        account['status'] = "未登录"
                    # 在主线程中更新UI
                    self.root.after(0, self.update_accounts_list)
                except Exception as e:
                    self.log(f"检查账号 '{account['name']}' 登录状态时出错: {str(e)}")
                    not_logged_in_accounts.append(account['name'])
            
            self.save_accounts()
            
            # 如果有未登录的账号,显示提示
            if not_logged_in_accounts:
                not_logged_in_str = ", ".join(not_logged_in_accounts)
                self.log(f"以下账号未登录,无法查看发布列表:{not_logged_in_str}")
                self.root.after(0, lambda: messagebox.showinfo("提示", f"以下账号未登录,请先登录后再查看发布列表:{not_logged_in_str}"))
                return
            
            # 查看已登录账号的发布列表
            if logged_in_accounts:
                account_names = ", ".join([account['name'] for account in logged_in_accounts])
                self.log(f"开始查看 {len(logged_in_accounts)} 个账号的发布列表: {account_names}")
               
                for account in logged_in_accounts:
                    try:
                        self.log(f"开始查看账号 '{account['name']}' 的发布列表...")
                        # 调用seeStatus.py中的seePublishListInfo函数
                        result = seePublishListInfo(account['profile_dir'])
                        self.log(f"查看账号 '{account['name']}' 的发布列表完成:")
                        #输出最新一条的数据
                        if result['code'] == 0 and len(result['videos']) > 0:
                            latest_video = result['videos'][0]
                            metrics_str = '\n'.join([f"    {key}: {value}" for key, value in latest_video['metrics'].items()])
                            self.log(f"最新视频信息:\n" \
                                    f"  发布状态: {latest_video['status']}\n" \
                                    f"  发布标题: {latest_video['title']}\n" \
                                    f"  发布时间: {latest_video['publish_time']}\n" \
                                    f"  统计数据:\n{metrics_str}")
                        
                    except Exception as e:
                        self.log(f"查看账号 '{account['name']}' 的发布列表时出错: {str(e)}")
        
        self.run_in_thread(task)
if __name__ == "__main__":
    root = tk.Tk()
    app = DouYinManager(root)
    root.mainloop()[/mw_shl_code]
环境
python3.11.0
Google Chrome浏览器 版本 140.0.7339.186
chromedriver.exe 需要跟Chrome浏览器匹配
启动方法
py douyin_manager.py


主界面.png (94.36 KB, 下载次数: 0)
下载附件
2025-9-21 10:50 上传

下载链接 https://wwzp.lanzoum.com/i4DPD36o6a2h
密码:52pj
欢迎反馈bug

账号, 状态

Williamongh   

请在帖子中插入部分关键代码
本版块仅限分享编程技术和源码相关内容,发布帖子必须带上关键代码和具体功能介绍
您需要登录后才可以回帖 登录 | 立即注册

返回顶部