商业数据分析工具little

查看 34|回复 5
作者:xiaowang667   
因本人工作需要,故写了这个数据分析工具方便我工作使用,大家自取。
Excel数据分析工具 产品介绍与操作说明
产品介绍
Excel数据分析工具是一款专门用于快速分析Excel数据的桌面应用程序,旨在帮助用户轻松进行同比、环比等数据分析工作。无需复杂的Excel公式或函数,只需简单几步操作,即可获得专业的数据分析结果。
核心功能
两列对比分析:快速对比两列数据,自动计算变化量和变化率(适用于同比、环比分析)
时间序列环比分析:支持纵向和横向时间序列数据,自动计算连续期间的环比变化
数据模板生成:提供Excel和CSV两种格式的数据模板,帮助用户规范数据格式
数据过滤:支持自定义过滤条件,精准筛选分析数据
大文件优化:针对大型数据集进行性能优化,确保流畅的分析体验
结果导出:分析结果可直接导出为Excel文件,便于进一步处理和分享
应用场景
企业财务数据分析(销售额、利润等指标的同比/环比分析)
业务运营数据监控(月度/季度/年度业绩变化趋势分析)
市场销售数据比较(不同产品、区域或渠道的业绩对比)
任何需要快速计算数据变化量和变化率的场景
操作说明
1. 启动程序
双击运行`excel_analysis_tool.py`文件即可启动程序。程序启动后,将显示主界面,包括文件选择、数据区域选择、分析选项、高级选项和数据预览区域。
2. 数据准备
程序提供了两种方式来准备分析数据:
方式一:使用数据模板
1. 在主界面的"高级选项"区域,点击"获取数据模板"按钮
2. 在弹出的对话框中选择模板格式:
   选择"是":获取Excel格式模板(需要安装xlsxwriter模块)
   选择"否":获取CSV格式模板(无需额外依赖)
3. 如果选择Excel格式,系统会生成包含三种数据格式的模板:
   两列对比分析模板
   时间序列模板(纵向)
   时间序列模板(横向)
   使用说明
4. 如果选择CSV格式,系统会进一步询问模板类型,可选择:
   两列对比分析模板
   时间序列模板
5. 根据模板中的示例数据,按照相同格式填写您的实际数据
方式二:直接使用现有Excel文件
确保您的Excel文件符合以下要求:
数据列必须为数值类型
不要在数据中包含合并单元格
对于时间序列分析,需要有时间列(如月份、季度等)
3. 加载数据
1. 在"Excel文件选择"区域,点击"浏览"按钮
2. 选择要分析的Excel文件(仅支持.xlsx格式)
3. 系统会自动读取文件中的所有Sheet,并在Sheet下拉框中显示
4. 选择要分析的Sheet,系统将自动加载该Sheet的列信息
5. 加载完成后,数据预览区域将显示前100行数据(可在高级选项中调整预览行数)
4. 设置分析参数
根据您选择的分析类型,设置相应的参数:
两列对比分析
1. 在"分析选项"区域,选择"两列对比分析(同比/环比)"
2. 在"数据区域选择"中:
选择"当前期数据列":要分析的当前期数据所在列
选择"对比期数据列":用于对比的上期数据所在列
时间序列环比分析
1. 在"分析选项"区域,选择"时间序列环比分析(后一个月与前一个月对比)"
2. 在"数据区域选择"中:
选择"当前期数据列":要分析的数值数据所在列
选择"时间列":时间信息所在列
设置"当前数据行(横向数据)":如果是横向数据结构,设置数据所在行号(默认为第2行)
5. 应用数据过滤(可选)
1. 在"高级选项"区域的"数据过滤"输入框中,输入过滤表达式
2. 点击"应用过滤"按钮,系统将根据过滤条件筛选数据
3. 数据预览区域将显示过滤后的结果
4. 若要清除过滤,清空过滤输入框并点击"应用过滤"按钮
6. 执行分析
1. 点击主界面底部的"开始分析"按钮
2. 系统将在后台线程中执行分析,进度条显示分析进度
3. 分析完成后,结果将显示在数据预览区域,并自动弹出保存对话框
4. 选择保存路径,将分析结果导出为Excel文件
7. 查看分析结果
分析结果包含以下信息:
两列对比分析:当前期值、对比期值、变化量、变化率(%)、合计行
时间序列分析:原始时间序列数据、分隔线、各期间环比分析结果、合计行
高级功能说明
数据模板功能详解
Excel格式模板
两列对比分析模板:适用于直接比较两列数据的情况,如本期与上期销售额对比
时间序列模板(纵向):适用于时间数据和数值数据分别在不同列的情况
时间序列模板(横向):适用于时间数据在行、数值数据在列的情况
使用说明:包含各类模板的详细使用指南
CSV格式模板
两列对比分析模板:简单的CSV格式,包含产品名称、本期销售额、上期销售额三列
时间序列模板:简单的CSV格式,包含月份和销售额两列
数据过滤表达式语法
过滤表达式使用pandas的eval语法,支持以下操作:
- 比较运算符:`>`, `=`, `
- 逻辑运算符:`and`, `or`, `not`
- 列名引用:直接使用列名,如`销售额 > 10000`
- 复合表达式:如`(销售额 > 10000) and (毛利率
大文件处理
程序对大文件(超过10000行或50列)进行了特殊优化:
自动调整预览行数,避免界面卡顿
使用分块处理技术,提高数据分析效率
导出时自动选择高效的引擎
注意事项
1. 确保您的Excel文件格式正确,不包含复杂的合并单元格或特殊格式
2. 进行Excel格式模板生成时,需安装xlsxwriter模块(可通过`pip install xlsxwriter`安装)
3. 对于大型数据集,分析过程可能需要较长时间,请耐心等待
4. 如遇程序无响应,请检查任务管理器中是否有多个Python进程运行
5. 如有其他问题,请查看程序界面底部的状态提示或错误信息
常见问题解答
**Q: 为什么选择Excel格式模板时提示需要安装xlsxwriter?**
A: 生成Excel格式模板需要使用xlsxwriter库。您可以通过命令行执行`pip install xlsxwriter`来安装。如果您不想安装额外模块,可以选择CSV格式模板。
**Q: 时间序列分析支持哪些时间格式?**
A: 程序支持各种常见的时间格式,如"1月"、"2月"、"2023-01"、"2023/01"等。只要时间序列是连续的,程序就能正确计算环比。
**Q: 如何处理分析结果中的N/A值?**
A: N/A值通常表示无法计算变化率(如对比期值为0的情况)。您可以在导出的Excel文件中手动处理这些值。
**Q: 程序支持Mac或Linux系统吗?**
A: 理论上支持,只要安装了Python和必要的依赖库。但程序主要是为Windows系统优化设计的。
[color=]因为我不会打包所以我把python项目做成压缩包传百度网盘了,希望大家理解
[color=]通过网盘分享的文件:excel_analysis_tool.zip
[color=]链接: https://pan.baidu.com/s/1Degtrsa3PIDJNTrUgSRZsg?pwd=a14w 提取码: a14w
[color=]--来自百度网盘超级会员v6的分享
以下是运行效果:


image.png (36.94 KB, 下载次数: 2)
下载附件
2025-8-29 14:20 上传

[Python] 纯文本查看 复制代码import pandas as pd
import pandas as pd
import tkinter as tk
from tkinter import filedialog, ttk
from tkinter.messagebox import showinfo, askyesno
import time
import threading
class ExcelAnalysisApp:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title('Excel数据分析工具')
        self.root.geometry('900x700')
        
        # 文件选择区域
        self.file_frame = ttk.LabelFrame(self.root, text='Excel文件选择')
        self.file_frame.pack(pady=10, padx=10, fill='x')
        
        self.file_path = tk.StringVar()
        ttk.Entry(self.file_frame, textvariable=self.file_path, width=50).pack(side='left', padx=5)
        ttk.Button(self.file_frame, text='浏览', command=self.select_file).pack(side='left')
        
        # Sheet选择
        ttk.Label(self.file_frame, text='Sheet:').pack(side='left', padx=5)
        self.sheet_name = tk.StringVar()
        self.sheet_combo = ttk.Combobox(self.file_frame, textvariable=self.sheet_name, width=15)
        self.sheet_combo.pack(side='left', padx=5)
        self.sheet_combo.bind('>', self.on_sheet_selected)
        
        # 区域选择
        self.range_frame = ttk.LabelFrame(self.root, text='数据区域选择')
        self.range_frame.pack(pady=10, padx=10, fill='x')
        
        ttk.Label(self.range_frame, text='当前期数据列:').pack(side='left', padx=5)
        self.current_col = ttk.Combobox(self.range_frame, width=15)
        self.current_col.pack(side='left', padx=5)
        
        ttk.Label(self.range_frame, text='对比期数据列:').pack(side='left', padx=5)
        self.previous_col = ttk.Combobox(self.range_frame, width=15)
        self.previous_col.pack(side='left', padx=5)
        
        # 添加数据行选择
        ttk.Label(self.range_frame, text='当前数据行(横向数据):').pack(side='left', padx=5)
        self.data_row = tk.StringVar(value='2')
        ttk.Entry(self.range_frame, textvariable=self.data_row, width=5).pack(side='left', padx=5)
        
        ttk.Label(self.range_frame, text='目标分析行(环比对比):').pack(side='left', padx=5)
        self.target_analysis_row = tk.StringVar(value='3')
        ttk.Entry(self.range_frame, textvariable=self.target_analysis_row, width=5).pack(side='left', padx=5)
        
        # 分析选项
        self.analysis_frame = ttk.LabelFrame(self.root, text='分析选项')
        self.analysis_frame.pack(pady=10, padx=10, fill='x')
        
        # 使用单选按钮替代复选框,确保二选一
        self.analysis_type = tk.StringVar(value='comparison')
        
        # 创建一个框架来放置单选按钮,使其对齐更好
        radio_frame = ttk.Frame(self.analysis_frame)
        radio_frame.pack(anchor='w', pady=5)
        
        ttk.Radiobutton(radio_frame, text='两列对比分析(同比/环比)', variable=self.analysis_type, value='comparison').pack(anchor='w', padx=10)
        ttk.Radiobutton(radio_frame, text='时间序列环比分析(后一个月与前一个月对比)', variable=self.analysis_type, value='time_series').pack(anchor='w', padx=10)
        
        # 时间列选择
        ttk.Label(self.analysis_frame, text='时间列:').pack(side='left', padx=5, pady=5)
        self.time_col = ttk.Combobox(self.analysis_frame, width=15)
        self.time_col.pack(side='left', padx=5, pady=5)
        
        # 高级选项
        advanced_frame = ttk.LabelFrame(self.root, text='高级选项')
        advanced_frame.pack(pady=10, padx=10, fill='x')
        
        ttk.Label(advanced_frame, text='预览行数:').pack(side='left', padx=5)
        self.preview_rows = tk.StringVar(value='100')
        ttk.Entry(advanced_frame, textvariable=self.preview_rows, width=5).pack(side='left', padx=5)
        
        ttk.Label(advanced_frame, text='数据过滤:').pack(side='left', padx=5)
        self.filter_var = tk.StringVar()
        ttk.Entry(advanced_frame, textvariable=self.filter_var, width=30).pack(side='left', padx=5)
        ttk.Button(advanced_frame, text='应用过滤', command=self.apply_filter).pack(side='left', padx=5)
        
        # 获取数据模板按钮
        ttk.Button(advanced_frame, text='获取数据模板', command=self.get_data_template).pack(side='right', padx=10, pady=5)
        
        # 执行按钮
        ttk.Button(self.root, text='开始分析', command=self.run_analysis).pack(pady=10)
        
        # 状态显示
        self.status_var = tk.StringVar()
        self.status_var.set('准备就绪')
        self.status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
        self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
        
        # 进度条
        self.progress_var = tk.DoubleVar()
        self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=100)
        self.progress_bar.pack(fill=tk.X, padx=10, pady=5)
        self.progress_bar['value'] = 0
        
        # 数据预览
        self.preview_frame = ttk.LabelFrame(self.root, text='数据预览')
        self.preview_frame.pack(pady=10, padx=10, fill='both', expand=True)
        
        # 添加滚动条
        self.tree_frame = ttk.Frame(self.preview_frame)
        self.tree_frame.pack(fill='both', expand=True)
        
        self.tree_scroll_y = ttk.Scrollbar(self.tree_frame, orient=tk.VERTICAL)
        self.tree_scroll_x = ttk.Scrollbar(self.tree_frame, orient=tk.HORIZONTAL)
        
        self.tree = ttk.Treeview(self.tree_frame, yscrollcommand=self.tree_scroll_y.set,
                                 xscrollcommand=self.tree_scroll_x.set)
        
        self.tree_scroll_y.config(command=self.tree.yview)
        self.tree_scroll_x.config(command=self.tree.xview)
        
        self.tree_scroll_y.pack(side=tk.RIGHT, fill=tk.Y)
        self.tree_scroll_x.pack(side=tk.BOTTOM, fill=tk.X)
        self.tree.pack(fill='both', expand=True)
        
        # 数据存储
        self.current_df = None
        self.filtered_df = None
        
        self.root.mainloop()
   
    def select_file(self):
        filepath = filedialog.askopenfilename(filetypes=[('Excel文件', '*.xlsx')])
        if filepath:
            self.file_path.set(filepath)
            try:
                # 获取所有sheet名称
                xl = pd.ExcelFile(filepath)
                sheet_names = xl.sheet_names
                self.sheet_combo['values'] = sheet_names
                if sheet_names:
                    self.sheet_name.set(sheet_names[0])
                    self.on_sheet_selected(None)
            except Exception as e:
                showinfo('错误', f'读取文件信息失败: {str(e)}')
   
    def on_sheet_selected(self, event):
        filepath = self.file_path.get()
        sheet = self.sheet_name.get()
        if filepath and sheet:
            self.load_columns(filepath, sheet)
   
    def get_data_template(self):
        """生成并下载数据模板(支持Excel和CSV格式)"""
        try:
            # 询问用户希望的模板格式
            response = askyesno('选择模板格式', '是否需要Excel格式的模板?\n\n(选择"是"需要安装xlsxwriter模块,选择"否"将生成CSV格式模板,无需额外依赖)')
            
            if response:
                # 用户选择Excel格式
                try:
                    # 尝试导入xlsxwriter以验证是否安装
                    import xlsxwriter
                    
                    # 创建一个ExcelWriter对象
                    file_path = filedialog.asksaveasfilename(
                        defaultextension='.xlsx',
                        filetypes=[('Excel文件', '*.xlsx')],
                        title='保存Excel数据模板'
                    )
                    
                    if not file_path:
                        return
                    
                    # 创建一个工作簿
                    with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
                        # 创建两列对比分析的模板
                        comparison_data = {
                            '产品名称': ['产品A', '产品B', '产品C', '产品D'],
                            '本期销售额': [10000, 15000, 8000, 12000],
                            '上期销售额': [9000, 16000, 7500, 11000]
                        }
                        comparison_df = pd.DataFrame(comparison_data)
                        comparison_df.to_excel(writer, sheet_name='两列对比分析模板', index=False)
                        
                        # 创建时间序列环比分析的模板(纵向)
                        time_series_vertical_data = {
                            '月份': ['1月', '2月', '3月', '4月', '5月', '6月'],
                            '销售额': [10000, 12000, 11000, 13000, 15000, 14000]
                        }
                        time_series_vertical_df = pd.DataFrame(time_series_vertical_data)
                        time_series_vertical_df.to_excel(writer, sheet_name='时间序列模板(纵向)', index=False)
                        
                        # 创建时间序列环比分析的模板(横向)
                        time_series_horizontal_data = {
                            '指标': ['月份', '销售额'],
                            '1月': ['1月', 10000],
                            '2月': ['2月', 12000],
                            '3月': ['3月', 11000],
                            '4月': ['4月', 13000],
                            '5月': ['5月', 15000],
                            '6月': ['6月', 14000]
                        }
                        time_series_horizontal_df = pd.DataFrame(time_series_horizontal_data)
                        time_series_horizontal_df.to_excel(writer, sheet_name='时间序列模板(横向)', index=False)
                        
                        # 添加说明Sheet
                        instructions_data = {
                            '说明项': ['两列对比分析使用方法', '时间序列环比分析(纵向)使用方法', '时间序列环比分析(横向)使用方法', '注意事项'],
                            '详细说明': [
                                '在"两列对比分析模板"Sheet中,填写需要对比的两列数据,如本期和上期数据。使用时选择对应的列即可。',
                                '在"时间序列模板(纵向)"Sheet中,第一列填写时间(如月份),第二列填写对应的数据。使用时选择时间列和数据列。',
                                '在"时间序列模板(横向)"Sheet中,第一行填写月份,第二行填写对应的数据。使用时设置数据行为2(因为第一行是表头)。',
                                '1. 数据列必须为数值类型;2. 不要在数据中包含合并单元格;3. 使用前请确保已安装必要的Python库。'
                            ]
                        }
                        instructions_df = pd.DataFrame(instructions_data)
                        instructions_df.to_excel(writer, sheet_name='使用说明', index=False)
                    
                    showinfo('成功', f'Excel数据模板已保存到 {file_path}\n\n模板包含三种数据格式:\n1. 两列对比分析模板(适用于同比/环比)\n2. 时间序列模板(纵向)(适用于时间序列环比分析)\n3. 时间序列模板(横向)(适用于时间序列环比分析)')
                except ImportError:
                    # xlsxwriter未安装,提供CSV替代方案
                    self._generate_csv_template()
            else:
                # 用户直接选择CSV格式
                self._generate_csv_template()
        except Exception as e:
            showinfo('错误', f'生成模板时出错: {str(e)}')
            print(f'生成模板时出错: {str(e)}')
            
    def _generate_csv_template(self):
        """生成CSV格式的数据模板(无需额外依赖)"""
        try:
            # 询问用户想要哪种类型的CSV模板
            response = askyesno('选择CSV模板类型', '需要两列对比分析的模板吗?\n\n(选择"是"生成两列对比模板,选择"否"生成时间序列模板)')
            
            if response:
                # 两列对比分析模板
                file_path = filedialog.asksaveasfilename(
                    defaultextension='.csv',
                    filetypes=[('CSV文件', '*.csv')],
                    title='保存两列对比分析模板'
                )
               
                if not file_path:
                    return
               
                comparison_data = {
                    '产品名称': ['产品A', '产品B', '产品C', '产品D'],
                    '本期销售额': [10000, 15000, 8000, 12000],
                    '上期销售额': [9000, 16000, 7500, 11000]
                }
                comparison_df = pd.DataFrame(comparison_data)
                comparison_df.to_csv(file_path, index=False, encoding='utf-8-sig')
               
                showinfo('成功', f'两列对比分析CSV模板已保存到 {file_path}\n\n使用说明:\n1. 填写需要对比的两列数据(如本期和上期数据)\n2. 确保数据列为数值类型\n3. 不要包含合并单元格\n4. 使用程序时选择对应的列即可进行同比/环比分析')
            else:
                # 时间序列模板(纵向)
                file_path = filedialog.asksaveasfilename(
                    defaultextension='.csv',
                    filetypes=[('CSV文件', '*.csv')],
                    title='保存时间序列模板'
                )
               
                if not file_path:
                    return
               
                time_series_data = {
                    '月份': ['1月', '2月', '3月', '4月', '5月', '6月'],
                    '销售额': [10000, 12000, 11000, 13000, 15000, 14000]
                }
                time_series_df = pd.DataFrame(time_series_data)
                time_series_df.to_csv(file_path, index=False, encoding='utf-8-sig')
               
                showinfo('成功', f'时间序列CSV模板已保存到 {file_path}\n\n使用说明:\n1. 第一列填写时间(如月份)\n2. 第二列填写对应的数据\n3. 确保数据列为数值类型\n4. 使用程序时选择时间列和数据列进行时间序列环比分析')
        except Exception as e:
            showinfo('错误', f'生成CSV模板时出错: {str(e)}')
            print(f'生成CSV模板时出错: {str(e)}')
            
    def load_columns(self, filepath, sheet):
        try:
            # 对于大文件,使用read_excel的优化选项
            df = pd.read_excel(filepath, sheet_name=sheet, engine='openpyxl')
            
            # 保存当前数据
            self.current_df = df
            self.filtered_df = df.copy()
            
            # 设置列选择器
            columns = [str(col) for col in df.columns]
            self.current_col['values'] = columns
            self.previous_col['values'] = columns
            self.time_col['values'] = columns
            
            # 检查数据大小,对大数据集进行特殊处理
            row_count, col_count = df.shape
            if row_count > 10000 or col_count > 50:
                self.status_var.set(f'已加载大文件: {row_count}行, {col_count}列,请选择分析列')
                # 只预览前N行,N由用户设置
                try:
                    preview_rows = int(self.preview_rows.get())
                except ValueError:
                    preview_rows = 100
                self.show_preview(df.head(preview_rows))
               
                # 提醒用户这是大文件
                askyesno('提示', f'这是一个大文件({row_count}行),分析可能需要较长时间,是否继续?')
            else:
                self.status_var.set(f'文件已加载: {row_count}行, {col_count}列,请选择分析列')
                # 显示数据预览
                try:
                    preview_rows = int(self.preview_rows.get())
                except ValueError:
                    preview_rows = 100
                self.show_preview(df.head(preview_rows))
        except Exception as e:
            showinfo('错误', f'加载文件失败: {str(e)}')
            self.status_var.set('加载失败')
   
    def apply_filter(self):
        if self.current_df is None:
            showinfo('提示', '请先加载文件')
            return
        
        filter_expr = self.filter_var.get()
        if not filter_expr:
            self.filtered_df = self.current_df.copy()
            self.status_var.set('已清除过滤条件')
            try:
                preview_rows = int(self.preview_rows.get())
            except ValueError:
                preview_rows = 100
            self.show_preview(self.filtered_df.head(preview_rows))
            return
        
        try:
            # 使用eval进行数据过滤
            mask = self.current_df.eval(filter_expr)
            self.filtered_df = self.current_df[mask]
            
            # 显示过滤结果
            row_count = len(self.filtered_df)
            self.status_var.set(f'已应用过滤,显示 {row_count} 行数据')
            
            try:
                preview_rows = int(self.preview_rows.get())
            except ValueError:
                preview_rows = 100
            self.show_preview(self.filtered_df.head(preview_rows))
        except Exception as e:
            showinfo('过滤错误', f'过滤表达式错误: {str(e)}')
   
    def show_preview(self, df):
        # 清空现有数据
        for i in self.tree.get_children():
            self.tree.delete(i)
        
        if df.empty:
            self.status_var.set('没有数据可显示')
            return
        
        # 优化列显示,处理大量列的情况
        max_cols = 50  # 限制最多显示的列数
        if len(df.columns) > max_cols:
            # 只显示前max_cols列
            cols_to_display = list(df.columns)[:max_cols]
            # 添加一列表示还有隐藏的列
            cols_to_display.append(f'+{len(df.columns) - max_cols} 列')
            df_display = df[df.columns[:max_cols]].copy()
            # 添加一个空列作为占位符
            df_display[f'+{len(df.columns) - max_cols} 列'] = ''
        else:
            cols_to_display = list(df.columns)
            df_display = df.copy()
        
        # 设置列
        self.tree['columns'] = cols_to_display
        for col in cols_to_display:
            # 确保列名显示为原始文本
            col_name = str(col)
            self.tree.heading(col, text=col_name)
            
            # 动态调整列宽
            if col in df.columns:
                # 找出该列中最长的字符串长度
                if pd.api.types.is_numeric_dtype(df[col]):
                    # 数值列固定宽度
                    width = 100
                else:
                    # 文本列根据内容调整
                    max_len = max(df[col].astype(str).apply(len).max(), len(col_name))
                    width = min(max_len * 10 + 20, 300)  # 限制最大宽度
            else:
                width = 100
            
            self.tree.column(col, width=width, anchor='center')
              
        # 确保数据正确显示
        # 创建DataFrame的副本,避免SettingWithCopyWarning
        df_display = df_display.copy()
        
        # 优化数据转换逻辑
        for col in df_display.columns:
            if col in df.columns:
                if pd.api.types.is_numeric_dtype(df[col]):
                    # 数值类型处理,格式化显示
                    df_display[col] = df_display[col].apply(lambda x: f'{x:.2f}' if pd.notna(x) and isinstance(x, (int, float)) else '')
                else:
                    # 非数值类型确保显示为字符串
                    df_display[col] = df_display[col].astype(str).fillna('')
            else:
                df_display[col] = ''
        
        # 分批添加数据,优化性能
        batch_size = 500  # 每批处理的行数
        total_rows = len(df_display)
        
        # 对于大量数据,只显示前N行
        try:
            preview_rows = int(self.preview_rows.get())
        except ValueError:
            preview_rows = 100
        
        display_rows = min(total_rows, preview_rows)
        
        for i in range(0, display_rows, batch_size):
            end_idx = min(i + batch_size, display_rows)
            batch = df_display.iloc[i:end_idx]
            
            for _, row in batch.iterrows():
                # 转换所有值为字符串,确保中文显示
                str_values = [str(x) if x is not None and str(x) != 'nan' else '' for x in row]
                self.tree.insert('', 'end', values=str_values)            
        
        # 如果有更多行没有显示,添加一个提示行
        if total_rows > display_rows:
            self.tree.insert('', 'end', values=[f'... 还有 {total_rows - display_rows} 行未显示 ...'] + [''] * (len(cols_to_display) - 1))
   
    def run_analysis(self):
        filepath = self.file_path.get()
        current_col = self.current_col.get()
        previous_col = self.previous_col.get()
        
        if not filepath or not current_col or not previous_col:
            showinfo('提示', '请先选择文件和指定分析列')
            return
        
        # 使用线程进行分析,避免UI冻结
        analysis_thread = threading.Thread(target=self._run_analysis_thread)
        analysis_thread.daemon = True
        analysis_thread.start()
   
    def _set_progress(self, value):
        """设置进度条值的辅助函数"""
        self.progress_bar['value'] = value
   
    def _run_comparison_analysis(self, df, current_col, previous_col):
        """执行两列对比分析(同比/环比)"""
        # 检查列是否存在
        if current_col not in df.columns or previous_col not in df.columns:
            self.root.after(0, lambda: showinfo('错误', '选择的列不存在于数据中'))
            self.root.after(0, lambda: self.status_var.set('分析失败'))
            return None
        self.root.after(0, lambda: self.status_var.set('处理数据...'))
        
        # 大文件优化:分块处理
        chunk_size = 50000
        row_count = len(df)
        
        if row_count > chunk_size:
            # 分块处理
            chunks = []
            total_chunks = (row_count + chunk_size - 1) // chunk_size
            
            for i in range(0, row_count, chunk_size):
                chunk = df.iloc[i:i+chunk_size].copy()
               
                # 确保两列都是数值类型
                chunk[current_col] = pd.to_numeric(chunk[current_col], errors='coerce')
                chunk[previous_col] = pd.to_numeric(chunk[previous_col], errors='coerce')
               
                # 计算同比/环比变化率
                chunk['当前期值'] = chunk[current_col]
                chunk['对比期值'] = chunk[previous_col]
                chunk['变化量'] = chunk[current_col] - chunk[previous_col]
               
                # 处理除零错误,使用pandas的mask函数
                chunk['变化率(%)'] = (chunk['变化量'] / chunk[previous_col]) * 100
                chunk['变化率(%)'] = chunk['变化率(%)'].mask(chunk[previous_col] == 0, pd.NA)
               
                chunks.append(chunk[['当前期值', '对比期值', '变化量', '变化率(%)']])
               
                # 更新进度
                progress = 20 + (i / row_count) * 60
                self.root.after(0, lambda p=progress: self._set_progress(p))
               
            # 合并所有块
            result_df = pd.concat(chunks)
        else:
            # 小文件直接处理
            # 确保两列都是数值类型
            df[current_col] = pd.to_numeric(df[current_col], errors='coerce')
            df[previous_col] = pd.to_numeric(df[previous_col], errors='coerce')
            
            # 计算同比/环比变化率
            df['当前期值'] = df[current_col]
            df['对比期值'] = df[previous_col]
            df['变化量'] = df[current_col] - df[previous_col]
            
            # 处理除零错误,使用pandas的mask函数
            df['变化率(%)'] = (df['变化量'] / df[previous_col]) * 100
            df['变化率(%)'] = df['变化率(%)'].mask(df[previous_col] == 0, pd.NA)
            
            result_df = df[['当前期值', '对比期值', '变化量', '变化率(%)']].copy()
        
        self.root.after(0, lambda: self._set_progress(80))
        return result_df
    def _run_time_series_analysis(self, df, current_col, time_col_name):
        """执行时间序列环比分析"""
        # 检测数据结构并自动处理横向数据
        # 横向数据:第一行是月份,指定行是数据
        # 我们需要转置数据以便正确计算环比
        try:
            data_row_idx = int(self.data_row.get()) - 1  # 转换为0索引
            
            # 横向数据检测逻辑增强
            if len(df) > data_row_idx and len(df.columns) > 2:
                # 横向数据结构:月份在行0,数据在行data_row_idx
                # 提取月份行和数据行
                months = df.iloc[0].tolist()
                values = df.iloc[data_row_idx].tolist()
               
                # 创建转置后的DataFrame
                transposed_df = pd.DataFrame({
                    time_col_name: months,
                    current_col: values
                })
               
                # 过滤掉空值行
                transposed_df = transposed_df.dropna(subset=[time_col_name, current_col])
               
                # 确保数据列是数值类型
                transposed_df[current_col] = pd.to_numeric(transposed_df[current_col], errors='coerce')
               
                # 使用转置后的数据
                processed_df = transposed_df
            else:
                # 传统的纵向数据结构
                # 检查列是否存在
                if current_col not in df.columns or time_col_name not in df.columns:
                    self.root.after(0, lambda: showinfo('错误', '选择的数据列或时间列不存在于数据中'))
                    self.root.after(0, lambda: self.status_var.set('分析失败'))
                    return None
               
                # 确保数据列是数值类型
                df[current_col] = pd.to_numeric(df[current_col], errors='coerce')
               
                processed_df = df.copy()
        except Exception as e:
            # 处理行索引转换错误
            self.root.after(0, lambda: showinfo('错误', f'数据行索引设置错误: {str(e)}'))
            self.root.after(0, lambda: self.status_var.set('分析失败'))
            return None
        
        # 复制原始数据用于显示
        result_df = processed_df.copy()
        
        # 重命名列以便显示
        result_df['当前期值'] = result_df[current_col]
        
        # 构建环比计算结果:按用户需求,后面的月份对应前面的月份
        # 例如12月对应11月,7月对应6月等
        comparison_results = []
        try:
            # 确保processed_df不为空且至少有两行数据
            if len(processed_df) >= 2:
                for i in range(1, len(processed_df)):
                    # 安全地获取当前值和前一个值
                    if i = 0:
                        current_value = processed_df.iloc[current_col]
                        previous_value = processed_df.iloc[i-1][current_col]
                        current_month = processed_df.iloc[time_col_name]
                        previous_month = processed_df.iloc[i-1][time_col_name]
                        
                        if pd.notna(current_value) and pd.notna(previous_value):
                            change_amount = current_value - previous_value
                            if previous_value != 0:
                                change_rate = (change_amount / previous_value) * 100
                            else:
                                change_rate = pd.NA
                        else:
                            change_amount = pd.NA
                            change_rate = pd.NA
                        
                        # 构建环比结果行
                        comparison_row = {
                            time_col_name: f'{current_month}环比{previous_month}',
                            '当前期值': current_value,
                            '对比期值': previous_value,
                            '变化量': change_amount,
                            '变化率(%)': change_rate
                        }
                        comparison_results.append(comparison_row)
        except Exception as e:
            self.root.after(0, lambda: showinfo('错误', f'环比计算错误: {str(e)}'))
            self.root.after(0, lambda: self.status_var.set('分析失败'))
            return None
        
        # 创建环比结果DataFrame
        comparison_df = pd.DataFrame(comparison_results)
        
        # 添加分割行以便区分原始数据和分析结果
        separator_row = pd.DataFrame({
            time_col_name: ['----------'],
            '当前期值': [pd.NA],
            '对比期值': [pd.NA],
            '变化量': [pd.NA],
            '变化率(%)': [pd.NA]
        })
        
        # 构造最终结果DataFrame
        result_df = pd.concat([
            result_df[[time_col_name, '当前期值']],  # 原始数据
            separator_row,                          # 分割线
            comparison_df                           # 环比分析结果
        ]).reset_index(drop=True)
        
        # 确保所有结果列都存在
        for col in ['对比期值', '变化量', '变化率(%)']:
            if col not in result_df.columns:
                result_df[col] = pd.NA
        
        return result_df
    def _run_analysis_thread(self):
        filepath = self.file_path.get()
        current_col = self.current_col.get()
        previous_col = self.previous_col.get()
        sheet = self.sheet_name.get()
        
        try:
            # 更新状态
            self.root.after(0, lambda: self.status_var.set('开始分析...'))
            self.root.after(0, lambda: self._set_progress(0))
            
            # 使用已经加载的过滤后数据,避免重复读取
            if self.filtered_df is not None:
                df = self.filtered_df.copy()
            else:
                # 如果没有过滤数据,则重新读取
                self.root.after(0, lambda: self.status_var.set('读取数据...'))
                try:
                    df = pd.read_excel(filepath, sheet_name=sheet, engine='openpyxl')
                except FileNotFoundError:
                    self.root.after(0, lambda: showinfo('错误', f'找不到文件: {filepath}'))
                    self.root.after(0, lambda: self.status_var.set('分析失败: 文件不存在'))
                    return
                except Exception as e:
                    self.root.after(0, lambda: showinfo('错误', f'读取文件时出错: {str(e)}'))
                    self.root.after(0, lambda: self.status_var.set(f'分析失败: {str(e)}'))
                    return
            
            self.root.after(0, lambda: self._set_progress(20))
            
            # 根据选择的分析类型执行相应分析
            analysis_type = self.analysis_type.get()
            result_df = None
            
            if analysis_type == 'time_series':
                # 时间序列环比分析
                time_col_name = self.time_col.get()
               
                self.root.after(0, lambda: self.status_var.set('处理时间序列数据...'))
                result_df = self._run_time_series_analysis(df, current_col, time_col_name)
                if result_df is None:
                    return
               
            elif analysis_type == 'comparison':
                # 传统的两列对比分析
                result_df = self._run_comparison_analysis(df, current_col, previous_col)
                if result_df is None:
                    return
            else:
                # 如果没有选择分析方式,显示提示
                self.root.after(0, lambda: showinfo('提示', '请选择分析方式'))
                self.root.after(0, lambda: self.status_var.set('准备就绪'))
                return
            
            # 计算合计值,优化计算性能
            self.root.after(0, lambda: self.status_var.set('计算汇总数据...'))
            
            if analysis_type == 'time_series':
                # 对于时间序列环比分析,只计算环比结果部分的合计
                # 找出环比结果开始的位置(从分割行之后开始)
                try:
                    time_col_name = self.time_col.get()
                    
                    # 安全地查找分隔符索引
                    if time_col_name in result_df.columns:
                        separator_index = result_df.index[result_df[time_col_name] == '----------'].tolist()
                        
                        if separator_index and len(separator_index) > 0 and separator_index[0]+1  10000:
                            export_df.to_excel(export_path, index=True, engine='xlsxwriter')
                        else:
                            export_df.to_excel(export_path, index=True)
                        
                        showinfo('成功', f'结果已保存到 {export_path}')
                    except Exception as e:
                        showinfo('导出错误', f'导出失败: {str(e)}')
            
            # 在主线程中执行文件对话框
            self.root.after(100, export_result)
        except Exception as e:
            self.root.after(0, lambda: showinfo('错误', f'分析失败: {str(e)}'))
            self.root.after(0, lambda: self.status_var.set('分析失败'))
            self.root.after(0, lambda: self._set_progress(0))
if __name__ == '__main__':
    app = ExcelAnalysisApp()

宋体, 数据

xiaowang667
OP
  

通过网盘分享的文件:excel_analysis_tool_project.zip
链接: https://pan.baidu.com/s/1THxsndF4IOcabarbuxrDpg?pwd=xgcb 提取码: xgcb
--来自百度网盘超级会员v6的分享
第二版本改版已经上传
xiaowang667
OP
  


xiaowang667 发表于 2025-9-8 10:25
通过网盘分享的文件:excel_analysis_tool_project.zip
链接: https://pan.baidu.com/s/1THxsndF4IOcabarb ...

第二版才用模块化代码处理,不仅能打包,还能灵活地以“库”或“可执行文件”形式分发,另外新增聚类分析功能
cjw89   

谢谢分享,来试试效果
huowulianyu   

谢谢分享,下载学习学习
applemsjstc   

果然panda库是excel处理的神器,DataFrame比excel更加灵活,我先mark了
您需要登录后才可以回帖 登录 | 立即注册