Excel数据分析工具 产品介绍与操作说明
产品介绍
Excel数据分析工具是一款专门用于快速分析Excel数据的桌面应用程序,旨在帮助用户轻松进行同比、环比等数据分析工作。无需复杂的Excel公式或函数,只需简单几步操作,即可获得专业的数据分析结果。
核心功能
两列对比分析:快速对比两列数据,自动计算变化量和变化率(适用于同比、环比分析)
时间序列环比分析:支持纵向和横向时间序列数据,自动计算连续期间的环比变化
数据模板生成:提供Excel和CSV两种格式的数据模板,帮助用户规范数据格式
数据过滤:支持自定义过滤条件,精准筛选分析数据
大文件优化:针对大型数据集进行性能优化,确保流畅的分析体验
结果导出:分析结果可直接导出为Excel文件,便于进一步处理和分享
应用场景
企业财务数据分析(销售额、利润等指标的同比/环比分析)
业务运营数据监控(月度/季度/年度业绩变化趋势分析)
市场销售数据比较(不同产品、区域或渠道的业绩对比)
任何需要快速计算数据变化量和变化率的场景
操作说明
1. 启动程序
双击运行`excel_analysis_tool.py`文件即可启动程序。程序启动后,将显示主界面,包括文件选择、数据区域选择、分析选项、高级选项和数据预览区域。
2. 数据准备
程序提供了两种方式来准备分析数据:
方式一:使用数据模板
1. 在主界面的"高级选项"区域,点击"获取数据模板"按钮
2. 在弹出的对话框中选择模板格式:
选择"是":获取Excel格式模板(需要安装xlsxwriter模块)
选择"否":获取CSV格式模板(无需额外依赖)
3. 如果选择Excel格式,系统会生成包含三种数据格式的模板:
两列对比分析模板
时间序列模板(纵向)
时间序列模板(横向)
使用说明
4. 如果选择CSV格式,系统会进一步询问模板类型,可选择:
两列对比分析模板
时间序列模板
5. 根据模板中的示例数据,按照相同格式填写您的实际数据
方式二:直接使用现有Excel文件
确保您的Excel文件符合以下要求:
数据列必须为数值类型
不要在数据中包含合并单元格
对于时间序列分析,需要有时间列(如月份、季度等)
3. 加载数据
1. 在"Excel文件选择"区域,点击"浏览"按钮
2. 选择要分析的Excel文件(仅支持.xlsx格式)
3. 系统会自动读取文件中的所有Sheet,并在Sheet下拉框中显示
4. 选择要分析的Sheet,系统将自动加载该Sheet的列信息
5. 加载完成后,数据预览区域将显示前100行数据(可在高级选项中调整预览行数)
4. 设置分析参数
根据您选择的分析类型,设置相应的参数:
两列对比分析
1. 在"分析选项"区域,选择"两列对比分析(同比/环比)"
2. 在"数据区域选择"中:
选择"当前期数据列":要分析的当前期数据所在列
选择"对比期数据列":用于对比的上期数据所在列
时间序列环比分析
1. 在"分析选项"区域,选择"时间序列环比分析(后一个月与前一个月对比)"
2. 在"数据区域选择"中:
选择"当前期数据列":要分析的数值数据所在列
选择"时间列":时间信息所在列
设置"当前数据行(横向数据)":如果是横向数据结构,设置数据所在行号(默认为第2行)
5. 应用数据过滤(可选)
1. 在"高级选项"区域的"数据过滤"输入框中,输入过滤表达式
2. 点击"应用过滤"按钮,系统将根据过滤条件筛选数据
3. 数据预览区域将显示过滤后的结果
4. 若要清除过滤,清空过滤输入框并点击"应用过滤"按钮
6. 执行分析
1. 点击主界面底部的"开始分析"按钮
2. 系统将在后台线程中执行分析,进度条显示分析进度
3. 分析完成后,结果将显示在数据预览区域,并自动弹出保存对话框
4. 选择保存路径,将分析结果导出为Excel文件
7. 查看分析结果
分析结果包含以下信息:
两列对比分析:当前期值、对比期值、变化量、变化率(%)、合计行
时间序列分析:原始时间序列数据、分隔线、各期间环比分析结果、合计行
高级功能说明
数据模板功能详解
Excel格式模板
两列对比分析模板:适用于直接比较两列数据的情况,如本期与上期销售额对比
时间序列模板(纵向):适用于时间数据和数值数据分别在不同列的情况
时间序列模板(横向):适用于时间数据在行、数值数据在列的情况
使用说明:包含各类模板的详细使用指南
CSV格式模板
两列对比分析模板:简单的CSV格式,包含产品名称、本期销售额、上期销售额三列
时间序列模板:简单的CSV格式,包含月份和销售额两列
数据过滤表达式语法
过滤表达式使用pandas的eval语法,支持以下操作:
- 比较运算符:`>`, `=`, `
- 逻辑运算符:`and`, `or`, `not`
- 列名引用:直接使用列名,如`销售额 > 10000`
- 复合表达式:如`(销售额 > 10000) and (毛利率
大文件处理
程序对大文件(超过10000行或50列)进行了特殊优化:
自动调整预览行数,避免界面卡顿
使用分块处理技术,提高数据分析效率
导出时自动选择高效的引擎
注意事项
1. 确保您的Excel文件格式正确,不包含复杂的合并单元格或特殊格式
2. 进行Excel格式模板生成时,需安装xlsxwriter模块(可通过`pip install xlsxwriter`安装)
3. 对于大型数据集,分析过程可能需要较长时间,请耐心等待
4. 如遇程序无响应,请检查任务管理器中是否有多个Python进程运行
5. 如有其他问题,请查看程序界面底部的状态提示或错误信息
常见问题解答
**Q: 为什么选择Excel格式模板时提示需要安装xlsxwriter?**
A: 生成Excel格式模板需要使用xlsxwriter库。您可以通过命令行执行`pip install xlsxwriter`来安装。如果您不想安装额外模块,可以选择CSV格式模板。
**Q: 时间序列分析支持哪些时间格式?**
A: 程序支持各种常见的时间格式,如"1月"、"2月"、"2023-01"、"2023/01"等。只要时间序列是连续的,程序就能正确计算环比。
**Q: 如何处理分析结果中的N/A值?**
A: N/A值通常表示无法计算变化率(如对比期值为0的情况)。您可以在导出的Excel文件中手动处理这些值。
**Q: 程序支持Mac或Linux系统吗?**
A: 理论上支持,只要安装了Python和必要的依赖库。但程序主要是为Windows系统优化设计的。
[color=]因为我不会打包所以我把python项目做成压缩包传百度网盘了,希望大家理解
[color=]通过网盘分享的文件:excel_analysis_tool.zip
[color=]链接: https://pan.baidu.com/s/1Degtrsa3PIDJNTrUgSRZsg?pwd=a14w 提取码: a14w
[color=]--来自百度网盘超级会员v6的分享
以下是运行效果:

image.png (36.94 KB, 下载次数: 2)
下载附件
2025-8-29 14:20 上传
[Python] 纯文本查看 复制代码import pandas as pd
import pandas as pd
import tkinter as tk
from tkinter import filedialog, ttk
from tkinter.messagebox import showinfo, askyesno
import time
import threading
class ExcelAnalysisApp:
def __init__(self):
self.root = tk.Tk()
self.root.title('Excel数据分析工具')
self.root.geometry('900x700')
# 文件选择区域
self.file_frame = ttk.LabelFrame(self.root, text='Excel文件选择')
self.file_frame.pack(pady=10, padx=10, fill='x')
self.file_path = tk.StringVar()
ttk.Entry(self.file_frame, textvariable=self.file_path, width=50).pack(side='left', padx=5)
ttk.Button(self.file_frame, text='浏览', command=self.select_file).pack(side='left')
# Sheet选择
ttk.Label(self.file_frame, text='Sheet:').pack(side='left', padx=5)
self.sheet_name = tk.StringVar()
self.sheet_combo = ttk.Combobox(self.file_frame, textvariable=self.sheet_name, width=15)
self.sheet_combo.pack(side='left', padx=5)
self.sheet_combo.bind('>', self.on_sheet_selected)
# 区域选择
self.range_frame = ttk.LabelFrame(self.root, text='数据区域选择')
self.range_frame.pack(pady=10, padx=10, fill='x')
ttk.Label(self.range_frame, text='当前期数据列:').pack(side='left', padx=5)
self.current_col = ttk.Combobox(self.range_frame, width=15)
self.current_col.pack(side='left', padx=5)
ttk.Label(self.range_frame, text='对比期数据列:').pack(side='left', padx=5)
self.previous_col = ttk.Combobox(self.range_frame, width=15)
self.previous_col.pack(side='left', padx=5)
# 添加数据行选择
ttk.Label(self.range_frame, text='当前数据行(横向数据):').pack(side='left', padx=5)
self.data_row = tk.StringVar(value='2')
ttk.Entry(self.range_frame, textvariable=self.data_row, width=5).pack(side='left', padx=5)
ttk.Label(self.range_frame, text='目标分析行(环比对比):').pack(side='left', padx=5)
self.target_analysis_row = tk.StringVar(value='3')
ttk.Entry(self.range_frame, textvariable=self.target_analysis_row, width=5).pack(side='left', padx=5)
# 分析选项
self.analysis_frame = ttk.LabelFrame(self.root, text='分析选项')
self.analysis_frame.pack(pady=10, padx=10, fill='x')
# 使用单选按钮替代复选框,确保二选一
self.analysis_type = tk.StringVar(value='comparison')
# 创建一个框架来放置单选按钮,使其对齐更好
radio_frame = ttk.Frame(self.analysis_frame)
radio_frame.pack(anchor='w', pady=5)
ttk.Radiobutton(radio_frame, text='两列对比分析(同比/环比)', variable=self.analysis_type, value='comparison').pack(anchor='w', padx=10)
ttk.Radiobutton(radio_frame, text='时间序列环比分析(后一个月与前一个月对比)', variable=self.analysis_type, value='time_series').pack(anchor='w', padx=10)
# 时间列选择
ttk.Label(self.analysis_frame, text='时间列:').pack(side='left', padx=5, pady=5)
self.time_col = ttk.Combobox(self.analysis_frame, width=15)
self.time_col.pack(side='left', padx=5, pady=5)
# 高级选项
advanced_frame = ttk.LabelFrame(self.root, text='高级选项')
advanced_frame.pack(pady=10, padx=10, fill='x')
ttk.Label(advanced_frame, text='预览行数:').pack(side='left', padx=5)
self.preview_rows = tk.StringVar(value='100')
ttk.Entry(advanced_frame, textvariable=self.preview_rows, width=5).pack(side='left', padx=5)
ttk.Label(advanced_frame, text='数据过滤:').pack(side='left', padx=5)
self.filter_var = tk.StringVar()
ttk.Entry(advanced_frame, textvariable=self.filter_var, width=30).pack(side='left', padx=5)
ttk.Button(advanced_frame, text='应用过滤', command=self.apply_filter).pack(side='left', padx=5)
# 获取数据模板按钮
ttk.Button(advanced_frame, text='获取数据模板', command=self.get_data_template).pack(side='right', padx=10, pady=5)
# 执行按钮
ttk.Button(self.root, text='开始分析', command=self.run_analysis).pack(pady=10)
# 状态显示
self.status_var = tk.StringVar()
self.status_var.set('准备就绪')
self.status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
# 进度条
self.progress_var = tk.DoubleVar()
self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=100)
self.progress_bar.pack(fill=tk.X, padx=10, pady=5)
self.progress_bar['value'] = 0
# 数据预览
self.preview_frame = ttk.LabelFrame(self.root, text='数据预览')
self.preview_frame.pack(pady=10, padx=10, fill='both', expand=True)
# 添加滚动条
self.tree_frame = ttk.Frame(self.preview_frame)
self.tree_frame.pack(fill='both', expand=True)
self.tree_scroll_y = ttk.Scrollbar(self.tree_frame, orient=tk.VERTICAL)
self.tree_scroll_x = ttk.Scrollbar(self.tree_frame, orient=tk.HORIZONTAL)
self.tree = ttk.Treeview(self.tree_frame, yscrollcommand=self.tree_scroll_y.set,
xscrollcommand=self.tree_scroll_x.set)
self.tree_scroll_y.config(command=self.tree.yview)
self.tree_scroll_x.config(command=self.tree.xview)
self.tree_scroll_y.pack(side=tk.RIGHT, fill=tk.Y)
self.tree_scroll_x.pack(side=tk.BOTTOM, fill=tk.X)
self.tree.pack(fill='both', expand=True)
# 数据存储
self.current_df = None
self.filtered_df = None
self.root.mainloop()
def select_file(self):
filepath = filedialog.askopenfilename(filetypes=[('Excel文件', '*.xlsx')])
if filepath:
self.file_path.set(filepath)
try:
# 获取所有sheet名称
xl = pd.ExcelFile(filepath)
sheet_names = xl.sheet_names
self.sheet_combo['values'] = sheet_names
if sheet_names:
self.sheet_name.set(sheet_names[0])
self.on_sheet_selected(None)
except Exception as e:
showinfo('错误', f'读取文件信息失败: {str(e)}')
def on_sheet_selected(self, event):
filepath = self.file_path.get()
sheet = self.sheet_name.get()
if filepath and sheet:
self.load_columns(filepath, sheet)
def get_data_template(self):
"""生成并下载数据模板(支持Excel和CSV格式)"""
try:
# 询问用户希望的模板格式
response = askyesno('选择模板格式', '是否需要Excel格式的模板?\n\n(选择"是"需要安装xlsxwriter模块,选择"否"将生成CSV格式模板,无需额外依赖)')
if response:
# 用户选择Excel格式
try:
# 尝试导入xlsxwriter以验证是否安装
import xlsxwriter
# 创建一个ExcelWriter对象
file_path = filedialog.asksaveasfilename(
defaultextension='.xlsx',
filetypes=[('Excel文件', '*.xlsx')],
title='保存Excel数据模板'
)
if not file_path:
return
# 创建一个工作簿
with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
# 创建两列对比分析的模板
comparison_data = {
'产品名称': ['产品A', '产品B', '产品C', '产品D'],
'本期销售额': [10000, 15000, 8000, 12000],
'上期销售额': [9000, 16000, 7500, 11000]
}
comparison_df = pd.DataFrame(comparison_data)
comparison_df.to_excel(writer, sheet_name='两列对比分析模板', index=False)
# 创建时间序列环比分析的模板(纵向)
time_series_vertical_data = {
'月份': ['1月', '2月', '3月', '4月', '5月', '6月'],
'销售额': [10000, 12000, 11000, 13000, 15000, 14000]
}
time_series_vertical_df = pd.DataFrame(time_series_vertical_data)
time_series_vertical_df.to_excel(writer, sheet_name='时间序列模板(纵向)', index=False)
# 创建时间序列环比分析的模板(横向)
time_series_horizontal_data = {
'指标': ['月份', '销售额'],
'1月': ['1月', 10000],
'2月': ['2月', 12000],
'3月': ['3月', 11000],
'4月': ['4月', 13000],
'5月': ['5月', 15000],
'6月': ['6月', 14000]
}
time_series_horizontal_df = pd.DataFrame(time_series_horizontal_data)
time_series_horizontal_df.to_excel(writer, sheet_name='时间序列模板(横向)', index=False)
# 添加说明Sheet
instructions_data = {
'说明项': ['两列对比分析使用方法', '时间序列环比分析(纵向)使用方法', '时间序列环比分析(横向)使用方法', '注意事项'],
'详细说明': [
'在"两列对比分析模板"Sheet中,填写需要对比的两列数据,如本期和上期数据。使用时选择对应的列即可。',
'在"时间序列模板(纵向)"Sheet中,第一列填写时间(如月份),第二列填写对应的数据。使用时选择时间列和数据列。',
'在"时间序列模板(横向)"Sheet中,第一行填写月份,第二行填写对应的数据。使用时设置数据行为2(因为第一行是表头)。',
'1. 数据列必须为数值类型;2. 不要在数据中包含合并单元格;3. 使用前请确保已安装必要的Python库。'
]
}
instructions_df = pd.DataFrame(instructions_data)
instructions_df.to_excel(writer, sheet_name='使用说明', index=False)
showinfo('成功', f'Excel数据模板已保存到 {file_path}\n\n模板包含三种数据格式:\n1. 两列对比分析模板(适用于同比/环比)\n2. 时间序列模板(纵向)(适用于时间序列环比分析)\n3. 时间序列模板(横向)(适用于时间序列环比分析)')
except ImportError:
# xlsxwriter未安装,提供CSV替代方案
self._generate_csv_template()
else:
# 用户直接选择CSV格式
self._generate_csv_template()
except Exception as e:
showinfo('错误', f'生成模板时出错: {str(e)}')
print(f'生成模板时出错: {str(e)}')
def _generate_csv_template(self):
"""生成CSV格式的数据模板(无需额外依赖)"""
try:
# 询问用户想要哪种类型的CSV模板
response = askyesno('选择CSV模板类型', '需要两列对比分析的模板吗?\n\n(选择"是"生成两列对比模板,选择"否"生成时间序列模板)')
if response:
# 两列对比分析模板
file_path = filedialog.asksaveasfilename(
defaultextension='.csv',
filetypes=[('CSV文件', '*.csv')],
title='保存两列对比分析模板'
)
if not file_path:
return
comparison_data = {
'产品名称': ['产品A', '产品B', '产品C', '产品D'],
'本期销售额': [10000, 15000, 8000, 12000],
'上期销售额': [9000, 16000, 7500, 11000]
}
comparison_df = pd.DataFrame(comparison_data)
comparison_df.to_csv(file_path, index=False, encoding='utf-8-sig')
showinfo('成功', f'两列对比分析CSV模板已保存到 {file_path}\n\n使用说明:\n1. 填写需要对比的两列数据(如本期和上期数据)\n2. 确保数据列为数值类型\n3. 不要包含合并单元格\n4. 使用程序时选择对应的列即可进行同比/环比分析')
else:
# 时间序列模板(纵向)
file_path = filedialog.asksaveasfilename(
defaultextension='.csv',
filetypes=[('CSV文件', '*.csv')],
title='保存时间序列模板'
)
if not file_path:
return
time_series_data = {
'月份': ['1月', '2月', '3月', '4月', '5月', '6月'],
'销售额': [10000, 12000, 11000, 13000, 15000, 14000]
}
time_series_df = pd.DataFrame(time_series_data)
time_series_df.to_csv(file_path, index=False, encoding='utf-8-sig')
showinfo('成功', f'时间序列CSV模板已保存到 {file_path}\n\n使用说明:\n1. 第一列填写时间(如月份)\n2. 第二列填写对应的数据\n3. 确保数据列为数值类型\n4. 使用程序时选择时间列和数据列进行时间序列环比分析')
except Exception as e:
showinfo('错误', f'生成CSV模板时出错: {str(e)}')
print(f'生成CSV模板时出错: {str(e)}')
def load_columns(self, filepath, sheet):
try:
# 对于大文件,使用read_excel的优化选项
df = pd.read_excel(filepath, sheet_name=sheet, engine='openpyxl')
# 保存当前数据
self.current_df = df
self.filtered_df = df.copy()
# 设置列选择器
columns = [str(col) for col in df.columns]
self.current_col['values'] = columns
self.previous_col['values'] = columns
self.time_col['values'] = columns
# 检查数据大小,对大数据集进行特殊处理
row_count, col_count = df.shape
if row_count > 10000 or col_count > 50:
self.status_var.set(f'已加载大文件: {row_count}行, {col_count}列,请选择分析列')
# 只预览前N行,N由用户设置
try:
preview_rows = int(self.preview_rows.get())
except ValueError:
preview_rows = 100
self.show_preview(df.head(preview_rows))
# 提醒用户这是大文件
askyesno('提示', f'这是一个大文件({row_count}行),分析可能需要较长时间,是否继续?')
else:
self.status_var.set(f'文件已加载: {row_count}行, {col_count}列,请选择分析列')
# 显示数据预览
try:
preview_rows = int(self.preview_rows.get())
except ValueError:
preview_rows = 100
self.show_preview(df.head(preview_rows))
except Exception as e:
showinfo('错误', f'加载文件失败: {str(e)}')
self.status_var.set('加载失败')
def apply_filter(self):
if self.current_df is None:
showinfo('提示', '请先加载文件')
return
filter_expr = self.filter_var.get()
if not filter_expr:
self.filtered_df = self.current_df.copy()
self.status_var.set('已清除过滤条件')
try:
preview_rows = int(self.preview_rows.get())
except ValueError:
preview_rows = 100
self.show_preview(self.filtered_df.head(preview_rows))
return
try:
# 使用eval进行数据过滤
mask = self.current_df.eval(filter_expr)
self.filtered_df = self.current_df[mask]
# 显示过滤结果
row_count = len(self.filtered_df)
self.status_var.set(f'已应用过滤,显示 {row_count} 行数据')
try:
preview_rows = int(self.preview_rows.get())
except ValueError:
preview_rows = 100
self.show_preview(self.filtered_df.head(preview_rows))
except Exception as e:
showinfo('过滤错误', f'过滤表达式错误: {str(e)}')
def show_preview(self, df):
# 清空现有数据
for i in self.tree.get_children():
self.tree.delete(i)
if df.empty:
self.status_var.set('没有数据可显示')
return
# 优化列显示,处理大量列的情况
max_cols = 50 # 限制最多显示的列数
if len(df.columns) > max_cols:
# 只显示前max_cols列
cols_to_display = list(df.columns)[:max_cols]
# 添加一列表示还有隐藏的列
cols_to_display.append(f'+{len(df.columns) - max_cols} 列')
df_display = df[df.columns[:max_cols]].copy()
# 添加一个空列作为占位符
df_display[f'+{len(df.columns) - max_cols} 列'] = ''
else:
cols_to_display = list(df.columns)
df_display = df.copy()
# 设置列
self.tree['columns'] = cols_to_display
for col in cols_to_display:
# 确保列名显示为原始文本
col_name = str(col)
self.tree.heading(col, text=col_name)
# 动态调整列宽
if col in df.columns:
# 找出该列中最长的字符串长度
if pd.api.types.is_numeric_dtype(df[col]):
# 数值列固定宽度
width = 100
else:
# 文本列根据内容调整
max_len = max(df[col].astype(str).apply(len).max(), len(col_name))
width = min(max_len * 10 + 20, 300) # 限制最大宽度
else:
width = 100
self.tree.column(col, width=width, anchor='center')
# 确保数据正确显示
# 创建DataFrame的副本,避免SettingWithCopyWarning
df_display = df_display.copy()
# 优化数据转换逻辑
for col in df_display.columns:
if col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
# 数值类型处理,格式化显示
df_display[col] = df_display[col].apply(lambda x: f'{x:.2f}' if pd.notna(x) and isinstance(x, (int, float)) else '')
else:
# 非数值类型确保显示为字符串
df_display[col] = df_display[col].astype(str).fillna('')
else:
df_display[col] = ''
# 分批添加数据,优化性能
batch_size = 500 # 每批处理的行数
total_rows = len(df_display)
# 对于大量数据,只显示前N行
try:
preview_rows = int(self.preview_rows.get())
except ValueError:
preview_rows = 100
display_rows = min(total_rows, preview_rows)
for i in range(0, display_rows, batch_size):
end_idx = min(i + batch_size, display_rows)
batch = df_display.iloc[i:end_idx]
for _, row in batch.iterrows():
# 转换所有值为字符串,确保中文显示
str_values = [str(x) if x is not None and str(x) != 'nan' else '' for x in row]
self.tree.insert('', 'end', values=str_values)
# 如果有更多行没有显示,添加一个提示行
if total_rows > display_rows:
self.tree.insert('', 'end', values=[f'... 还有 {total_rows - display_rows} 行未显示 ...'] + [''] * (len(cols_to_display) - 1))
def run_analysis(self):
filepath = self.file_path.get()
current_col = self.current_col.get()
previous_col = self.previous_col.get()
if not filepath or not current_col or not previous_col:
showinfo('提示', '请先选择文件和指定分析列')
return
# 使用线程进行分析,避免UI冻结
analysis_thread = threading.Thread(target=self._run_analysis_thread)
analysis_thread.daemon = True
analysis_thread.start()
def _set_progress(self, value):
"""设置进度条值的辅助函数"""
self.progress_bar['value'] = value
def _run_comparison_analysis(self, df, current_col, previous_col):
"""执行两列对比分析(同比/环比)"""
# 检查列是否存在
if current_col not in df.columns or previous_col not in df.columns:
self.root.after(0, lambda: showinfo('错误', '选择的列不存在于数据中'))
self.root.after(0, lambda: self.status_var.set('分析失败'))
return None
self.root.after(0, lambda: self.status_var.set('处理数据...'))
# 大文件优化:分块处理
chunk_size = 50000
row_count = len(df)
if row_count > chunk_size:
# 分块处理
chunks = []
total_chunks = (row_count + chunk_size - 1) // chunk_size
for i in range(0, row_count, chunk_size):
chunk = df.iloc[i:i+chunk_size].copy()
# 确保两列都是数值类型
chunk[current_col] = pd.to_numeric(chunk[current_col], errors='coerce')
chunk[previous_col] = pd.to_numeric(chunk[previous_col], errors='coerce')
# 计算同比/环比变化率
chunk['当前期值'] = chunk[current_col]
chunk['对比期值'] = chunk[previous_col]
chunk['变化量'] = chunk[current_col] - chunk[previous_col]
# 处理除零错误,使用pandas的mask函数
chunk['变化率(%)'] = (chunk['变化量'] / chunk[previous_col]) * 100
chunk['变化率(%)'] = chunk['变化率(%)'].mask(chunk[previous_col] == 0, pd.NA)
chunks.append(chunk[['当前期值', '对比期值', '变化量', '变化率(%)']])
# 更新进度
progress = 20 + (i / row_count) * 60
self.root.after(0, lambda p=progress: self._set_progress(p))
# 合并所有块
result_df = pd.concat(chunks)
else:
# 小文件直接处理
# 确保两列都是数值类型
df[current_col] = pd.to_numeric(df[current_col], errors='coerce')
df[previous_col] = pd.to_numeric(df[previous_col], errors='coerce')
# 计算同比/环比变化率
df['当前期值'] = df[current_col]
df['对比期值'] = df[previous_col]
df['变化量'] = df[current_col] - df[previous_col]
# 处理除零错误,使用pandas的mask函数
df['变化率(%)'] = (df['变化量'] / df[previous_col]) * 100
df['变化率(%)'] = df['变化率(%)'].mask(df[previous_col] == 0, pd.NA)
result_df = df[['当前期值', '对比期值', '变化量', '变化率(%)']].copy()
self.root.after(0, lambda: self._set_progress(80))
return result_df
def _run_time_series_analysis(self, df, current_col, time_col_name):
"""执行时间序列环比分析"""
# 检测数据结构并自动处理横向数据
# 横向数据:第一行是月份,指定行是数据
# 我们需要转置数据以便正确计算环比
try:
data_row_idx = int(self.data_row.get()) - 1 # 转换为0索引
# 横向数据检测逻辑增强
if len(df) > data_row_idx and len(df.columns) > 2:
# 横向数据结构:月份在行0,数据在行data_row_idx
# 提取月份行和数据行
months = df.iloc[0].tolist()
values = df.iloc[data_row_idx].tolist()
# 创建转置后的DataFrame
transposed_df = pd.DataFrame({
time_col_name: months,
current_col: values
})
# 过滤掉空值行
transposed_df = transposed_df.dropna(subset=[time_col_name, current_col])
# 确保数据列是数值类型
transposed_df[current_col] = pd.to_numeric(transposed_df[current_col], errors='coerce')
# 使用转置后的数据
processed_df = transposed_df
else:
# 传统的纵向数据结构
# 检查列是否存在
if current_col not in df.columns or time_col_name not in df.columns:
self.root.after(0, lambda: showinfo('错误', '选择的数据列或时间列不存在于数据中'))
self.root.after(0, lambda: self.status_var.set('分析失败'))
return None
# 确保数据列是数值类型
df[current_col] = pd.to_numeric(df[current_col], errors='coerce')
processed_df = df.copy()
except Exception as e:
# 处理行索引转换错误
self.root.after(0, lambda: showinfo('错误', f'数据行索引设置错误: {str(e)}'))
self.root.after(0, lambda: self.status_var.set('分析失败'))
return None
# 复制原始数据用于显示
result_df = processed_df.copy()
# 重命名列以便显示
result_df['当前期值'] = result_df[current_col]
# 构建环比计算结果:按用户需求,后面的月份对应前面的月份
# 例如12月对应11月,7月对应6月等
comparison_results = []
try:
# 确保processed_df不为空且至少有两行数据
if len(processed_df) >= 2:
for i in range(1, len(processed_df)):
# 安全地获取当前值和前一个值
if i = 0:
current_value = processed_df.iloc[current_col]
previous_value = processed_df.iloc[i-1][current_col]
current_month = processed_df.iloc[time_col_name]
previous_month = processed_df.iloc[i-1][time_col_name]
if pd.notna(current_value) and pd.notna(previous_value):
change_amount = current_value - previous_value
if previous_value != 0:
change_rate = (change_amount / previous_value) * 100
else:
change_rate = pd.NA
else:
change_amount = pd.NA
change_rate = pd.NA
# 构建环比结果行
comparison_row = {
time_col_name: f'{current_month}环比{previous_month}',
'当前期值': current_value,
'对比期值': previous_value,
'变化量': change_amount,
'变化率(%)': change_rate
}
comparison_results.append(comparison_row)
except Exception as e:
self.root.after(0, lambda: showinfo('错误', f'环比计算错误: {str(e)}'))
self.root.after(0, lambda: self.status_var.set('分析失败'))
return None
# 创建环比结果DataFrame
comparison_df = pd.DataFrame(comparison_results)
# 添加分割行以便区分原始数据和分析结果
separator_row = pd.DataFrame({
time_col_name: ['----------'],
'当前期值': [pd.NA],
'对比期值': [pd.NA],
'变化量': [pd.NA],
'变化率(%)': [pd.NA]
})
# 构造最终结果DataFrame
result_df = pd.concat([
result_df[[time_col_name, '当前期值']], # 原始数据
separator_row, # 分割线
comparison_df # 环比分析结果
]).reset_index(drop=True)
# 确保所有结果列都存在
for col in ['对比期值', '变化量', '变化率(%)']:
if col not in result_df.columns:
result_df[col] = pd.NA
return result_df
def _run_analysis_thread(self):
filepath = self.file_path.get()
current_col = self.current_col.get()
previous_col = self.previous_col.get()
sheet = self.sheet_name.get()
try:
# 更新状态
self.root.after(0, lambda: self.status_var.set('开始分析...'))
self.root.after(0, lambda: self._set_progress(0))
# 使用已经加载的过滤后数据,避免重复读取
if self.filtered_df is not None:
df = self.filtered_df.copy()
else:
# 如果没有过滤数据,则重新读取
self.root.after(0, lambda: self.status_var.set('读取数据...'))
try:
df = pd.read_excel(filepath, sheet_name=sheet, engine='openpyxl')
except FileNotFoundError:
self.root.after(0, lambda: showinfo('错误', f'找不到文件: {filepath}'))
self.root.after(0, lambda: self.status_var.set('分析失败: 文件不存在'))
return
except Exception as e:
self.root.after(0, lambda: showinfo('错误', f'读取文件时出错: {str(e)}'))
self.root.after(0, lambda: self.status_var.set(f'分析失败: {str(e)}'))
return
self.root.after(0, lambda: self._set_progress(20))
# 根据选择的分析类型执行相应分析
analysis_type = self.analysis_type.get()
result_df = None
if analysis_type == 'time_series':
# 时间序列环比分析
time_col_name = self.time_col.get()
self.root.after(0, lambda: self.status_var.set('处理时间序列数据...'))
result_df = self._run_time_series_analysis(df, current_col, time_col_name)
if result_df is None:
return
elif analysis_type == 'comparison':
# 传统的两列对比分析
result_df = self._run_comparison_analysis(df, current_col, previous_col)
if result_df is None:
return
else:
# 如果没有选择分析方式,显示提示
self.root.after(0, lambda: showinfo('提示', '请选择分析方式'))
self.root.after(0, lambda: self.status_var.set('准备就绪'))
return
# 计算合计值,优化计算性能
self.root.after(0, lambda: self.status_var.set('计算汇总数据...'))
if analysis_type == 'time_series':
# 对于时间序列环比分析,只计算环比结果部分的合计
# 找出环比结果开始的位置(从分割行之后开始)
try:
time_col_name = self.time_col.get()
# 安全地查找分隔符索引
if time_col_name in result_df.columns:
separator_index = result_df.index[result_df[time_col_name] == '----------'].tolist()
if separator_index and len(separator_index) > 0 and separator_index[0]+1 10000:
export_df.to_excel(export_path, index=True, engine='xlsxwriter')
else:
export_df.to_excel(export_path, index=True)
showinfo('成功', f'结果已保存到 {export_path}')
except Exception as e:
showinfo('导出错误', f'导出失败: {str(e)}')
# 在主线程中执行文件对话框
self.root.after(100, export_result)
except Exception as e:
self.root.after(0, lambda: showinfo('错误', f'分析失败: {str(e)}'))
self.root.after(0, lambda: self.status_var.set('分析失败'))
self.root.after(0, lambda: self._set_progress(0))
if __name__ == '__main__':
app = ExcelAnalysisApp()