前言
各位好,最近朋友突然跟我提了一个小需求,说是需要计算工作天数并控制预算开票,需要根据总金额来分配不同岗位的工作量,每个岗位单价不同,总金额固定,需要合理分配工作量。手动计算非常繁琐且容易出错,所以开发了这个小工具来解决这个问题,并不能计算薪酬。
功能介绍
这是一个基于Python+Tkinter开发的岗位工作量分配工具,主要功能如下:
核心算法
工具使用了优化算法,确保:
使用方法
[ol]
[/ol]
程序界面截图

image.png (38.28 KB, 下载次数: 0)
下载附件
2025-5-16 10:21 上传
依赖库
代码
[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import random
import threading
import time
import os
import json
try:
import openpyxl
from openpyxl.styles import Font, Alignment, Border, Side
EXCEL_AVAILABLE = True
except ImportError:
EXCEL_AVAILABLE = False
# 配置文件路径
CONFIG_FILE = "config.json"
# 默认岗位单价和名称 - 只在没有配置文件时使用
DEFAULT_POSITIONS = [
{"name": "中级测试工程师", "price": 316.17},
{"name": "高级测试工程师", "price": 422.24},
{"name": "资深测试工程师", "price": 583.46},
{"name": "资深IOS", "price": 658.97},
{"name": "高级Java工程师", "price": 505.89},
{"name": "资深Java工程师", "price": 804.81},
{"name": "高级产品经理", "price": 352.9},
{"name": "产品经理", "price": 407.95},
{"name": "资深产品经理", "price": 542.62},
{"name": "高级前端开发工程师", "price": 614.11},
{"name": "前端开发工程师", "price": 236.54},
{"name": "资深前端开发工程师", "price": 444.67},
{"name": "资深数据开发工程师", "price": 750.75},
{"name": "资深运维工程师", "price": 763.09},
{"name": "初级运维工程师", "price": 312.15},
{"name": "中级运维工程师", "price": 537.01}
]
# 默认工作量限制
DEFAULT_MIN_WORKLOAD = 1.0
DEFAULT_MAX_WORKLOAD = 20.0
# 默认误差阈值
DEFAULT_ERROR_THRESHOLD = 0.05
# 加载配置文件
def load_config():
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
config = json.load(f)
# 支持旧格式
if "names" in config and "prices" in config:
names = config.get("names", [])
prices = config.get("prices", [])
positions = [{"name": name, "price": price}
for name, price in zip(names, prices)]
return positions
# 新格式
positions = config.get("positions", DEFAULT_POSITIONS)
return positions
except Exception as e:
print(f"加载配置文件出错: {str(e)}")
else:
# 配置文件不存在,创建默认配置文件
try:
config = {
"positions": DEFAULT_POSITIONS
}
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
print(f"已创建默认配置文件: {CONFIG_FILE}")
except Exception as e:
print(f"创建配置文件出错: {str(e)}")
return DEFAULT_POSITIONS
# 保存配置到文件
def save_config(positions):
try:
config = {
"positions": positions
}
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存配置文件出错: {str(e)}")
return False
# 定义全局版本的fine_tune函数
def fine_tune(workloads, total, error, prices, target_amount, min_workload=DEFAULT_MIN_WORKLOAD, max_workload=DEFAULT_MAX_WORKLOAD, error_threshold=DEFAULT_ERROR_THRESHOLD, max_iterations=1000):
"""微调工作量以减小误差"""
workloads = workloads.copy()
n = len(prices)
iterations_done = 0
# 设置误差阈值
error_threshold = error_threshold
# 记录最佳解决方案
best_workloads = workloads.copy()
best_error = error
best_total = total
# 添加最大运行时间限制(秒),增加到15秒提供更多优化时间
max_runtime = 15.0
start_time = time.time()
# 最小步长,确保精确到0.01
min_step = 0.01
# 直接校正函数 - 当误差接近阈值时使用
def direct_correction(current_workloads, current_error):
"""直接基于误差进行校正"""
if current_error max_runtime:
print("优化超时,返回当前最佳解")
break
# 确定应该增加还是减少工作量
direction = 1 if total 0:
# 单价从低到高排序(增加总额时,先调整低单价岗位效率更高)
indices = sorted(range(n), key=lambda i: prices)
else:
# 单价从高到低排序(减少总额时,先调整高单价岗位效率更高)
indices = sorted(range(n), key=lambda i: -prices)
improved = False
# 步长列表 - 确保包含足够小的步长以达到高精度
initial_steps = [0.1, 0.05, 0.02, 0.01]
# 对每个步长尝试调整每个岗位
for step in initial_steps:
if improved or time.time() - start_time > max_runtime:
break
for idx in indices:
# 防止低于最小工作量或超过最大工作量
if (workloads[idx] = max_workload and direction > 0):
continue
new_workloads = workloads.copy()
new_value = new_workloads[idx] + direction * step
new_workloads[idx] = round(new_value, 2) # 确保两位小数
# 检查工作量范围
if new_workloads[idx] max_workload:
continue
# 使用四位小数计算新总额
subtotals = [round(prices * new_workloads, 4) for i in range(n)]
new_total = sum(subtotals)
new_error = abs(new_total - target_amount)
# 如果误差减小,接受这次调整
if new_error error_threshold:
corrected_workloads, corrected_error = direct_correction(workloads, error)
if corrected_error error_threshold and time.time() - start_time max_runtime:
continue
# 使用更小的步长列表来获得更高精度
for step in [0.02, 0.01]:
# 不能低于最小工作量或超过最大工作量
if workloads = max_workload:
continue
new_workloads = workloads.copy()
new_workloads = round(new_workloads - step, 2)
new_workloads[j] = round(new_workloads[j] + step, 2)
# 检查工作量范围
if new_workloads max_workload or \
new_workloads[j] max_workload:
continue
# 计算新总额
subtotals = [round(prices[k] * new_workloads[k], 4) for k in range(n)]
new_total = sum(subtotals)
new_error = abs(new_total - target_amount)
if new_error 3 and iteration % 3 == 0:
# 根据误差大小直接计算需要的调整量
target_diff = target_amount - total
# 选择适合调整的岗位 - 误差大小接近于单价的倍数
best_adj_idx = -1
best_adj_error = error
for idx in range(n):
# 计算需要调整的工作量
adj_workload = workloads[idx] + target_diff / prices[idx]
# 如果调整后的工作量在范围内
if min_workload = 0 and best_adj_error error_threshold and best_error error_threshold:
corrected_workloads, corrected_error = direct_correction(workloads, error)
if corrected_error = cost_to_max:
# 如果剩余金额足够,直接分配到最大工作量
workloads[idx] = max_workload
remaining_amount -= cost_to_max
else:
# 如果剩余金额不足,精确计算可分配的工作量
possible_increase = remaining_amount / prices[idx]
workloads[idx] = round(min_workload + possible_increase, 2)
remaining_amount = 0
break
# 重新计算总额和误差
total, error = calc_sum_error(workloads)
# 如果有进度回调,先更新初始状态
if update_callback:
update_callback(0, max_iterations, error)
# 第二阶段:通过微调减少误差
# 使用fine_tune函数进行精细调整,增加最大迭代次数
workloads, total, error, iterations = fine_tune(
workloads,
total,
error,
prices,
target_amount,
min_workload,
max_workload,
threshold,
max_iterations
)
# 记录每次迭代的状态并更新进度
iteration_count = iterations
# 如果误差仍然大于阈值,尝试更多的优化方法
if error > threshold:
# 尝试乱序调整,有时候可以找到更好的解
import random
best_workloads = workloads.copy()
best_error = error
best_total = total
for attempt in range(5): # 增加到5次尝试
temp_workloads = workloads.copy()
# 随机选择几个岗位进行小幅调整
for i in range(10): # 增加到10次调整
iteration_count += 1
# 更新进度
if update_callback and iteration_count % 5 == 0:
update_callback(iteration_count, max_iterations, error)
if error 0 and temp_workloads[idx] >= max_workload):
continue
# 随机步长 - 使用更精确的步长
step = random.choice([0.01, 0.02, 0.03, 0.05])
temp_workloads[idx] = round(temp_workloads[idx] + direction * step, 2)
# 确保在范围内
temp_workloads[idx] = max(min_workload, min(max_workload, temp_workloads[idx]))
# 计算新误差
temp_total, temp_error = calc_sum_error(temp_workloads)
# 如果误差减小,接受这次调整
if temp_error threshold:
# 尝试直接基于误差调整工作量
diff = target_amount - total
# 寻找最适合调整的岗位
for idx in sorted_indices:
# 计算需要的调整量
required_change = diff / prices[idx]
new_workload = round(workloads[idx] + required_change, 2)
# 检查是否在允许范围内
if min_workload ", self.on_tree_double_click)
# 表格按钮区域
table_btn_frame = ttk.Frame(frm, style="TFrame")
table_btn_frame.grid(row=3, column=0, columnspan=3, sticky="ew", pady=(0, 5))
# 添加岗位按钮
add_btn = ttk.Button(table_btn_frame, text="添加岗位", command=self.add_position, style="TButton")
add_btn.pack(side="left", padx=5)
# 删除岗位按钮
delete_btn = ttk.Button(table_btn_frame, text="删除岗位", command=self.delete_position, style="TButton")
delete_btn.pack(side="left", padx=5)
# 填充初始数据 - 只显示岗位名称和单价
self.refresh_table()
# 状态栏
status_frame = ttk.Frame(frm, style="TFrame")
status_frame.grid(row=4, column=0, columnspan=3, sticky="ew", pady=(5, 0))
self.result_label = ttk.Label(status_frame, text="总金额:0.00 误差:0.00", style="TLabel")
self.result_label.pack(side="left")
# 检查openpyxl是否可用
if not EXCEL_AVAILABLE:
warning_label = ttk.Label(frm, text="注意: 导出/导入Excel功能需要安装openpyxl库。请运行: pip install openpyxl",
foreground="red", style="TLabel")
warning_label.grid(row=5, column=0, columnspan=3, sticky="w", pady=(5, 0))
def on_tree_double_click(self, event):
"""表格双击事件处理"""
region = self.tree.identify("region", event.x, event.y)
if region == "cell":
column = self.tree.identify_column(event.x)
row = self.tree.identify_row(event.y)
# 只允许编辑名称和单价列
column_index = int(column[1:]) - 1 # 转换为0-based索引
if column_index in [0, 1]: # 名称或单价列
self.edit_cell(row, column, column_index)
def edit_cell(self, row, column, column_index):
"""编辑单元格"""
# 获取当前值
item = self.tree.item(row)
values = item["values"]
# 创建编辑框
entry_edit = ttk.Entry(self.tree)
# 记录当前编辑的单元格
self.current_edit = (row, column, column_index, entry_edit)
# 设置当前值
entry_edit.insert(0, values[column_index])
# 计算位置
bbox = self.tree.bbox(row, column)
if not bbox:
return
# 放置编辑框
entry_edit.place(x=bbox[0], y=bbox[1], width=bbox[2], height=bbox[3])
# 焦点和选择
entry_edit.focus_set()
entry_edit.select_range(0, 'end')
# 绑定事件
entry_edit.bind("", self.on_edit_done)
entry_edit.bind("", self.on_edit_cancel)
entry_edit.bind("", self.on_edit_done)
def on_edit_done(self, event):
"""编辑完成"""
if not self.current_edit:
return
row, column, column_index, entry_edit = self.current_edit
try:
new_value = entry_edit.get()
# 获取当前值
item = self.tree.item(row)
values = list(item["values"])
old_value = values[column_index]
# 验证新值
item_id = self.tree.index(row) # 获取行索引
if column_index == 0: # 名称列
if not new_value.strip():
messagebox.showerror("错误", "岗位名称不能为空!")
entry_edit.focus_set()
return
# 更新数据
self.positions[item_id]["name"] = new_value
values[column_index] = new_value
elif column_index == 1: # 单价列
try:
price = float(new_value)
if price max_possible:
messagebox.showwarning("金额过大",
f"总金额超过了所有岗位按最大工作量{self.max_workload}计算的总和!\n最大可分配金额为{max_possible:.2f}元。\n将按最大工作量分配。")
self.update_status("优化中...")
# 定义一个进度更新函数,使UI保持响应
def progress_update(current_iter, total_iters, current_error):
if current_iter % 10 == 0: # 每10次迭代更新一次界面
self.update_status(f"优化中... 迭代: {current_iter}/{total_iters} 误差: {current_error:.4f}")
self.root.update() # 刷新UI界面
# 设置计算开始时间
start_time = time.time()
# 初始化最低工作量,计算初始总额
initial_workloads = [self.min_workload] * len(prices)
initial_subtotals = [round(prices * initial_workloads, 4) for i in range(len(prices))]
initial_total = sum(initial_subtotals)
initial_error = abs(initial_total - target_amount)
# 更新状态显示初始误差
self.update_status(f"优化中... 初始误差: {initial_error:.4f}")
self.root.update()
# 使用optimize_workloads函数进行工作量分配(从最小工作量开始提高高单价岗位的工作量)
# 这种方式分配的结果更均衡,且对小金额的分配更合理
workloads, total, error, iterations = optimize_workloads(
prices,
target_amount,
update_callback=progress_update, # 传入进度更新回调
max_iterations=max_iterations,
min_workload=self.min_workload,
max_workload=self.max_workload,
error_threshold=self.error_threshold
)
# 如果误差仍然大于阈值,采用额外的精度控制措施
if error > self.error_threshold:
self.update_status(f"精确调整中... 当前误差: {error:.4f}")
self.root.update()
# 尝试直接使用算法分配
alternative_workloads = allocate_high_price_first(
prices, target_amount, self.min_workload, self.max_workload, self.error_threshold
)
# 计算新的总额和误差
alternative_subtotals = [round(prices * alternative_workloads, 4) for i in range(len(prices))]
alternative_total = sum(alternative_subtotals)
alternative_error = abs(alternative_total - target_amount)
# 更新状态显示替代算法误差
self.update_status(f"尝试替代算法... 误差: {alternative_error:.4f}")
self.root.update()
# 如果替代方案更好,则使用替代方案
if alternative_error self.error_threshold:
self.update_status(f"精细微调中... 当前误差: {error:.4f}")
self.root.update()
# 按单价从高到低排序
sorted_indices = sorted(range(len(prices)), key=lambda i: -prices)
# 尝试微调前几个岗位的工作量
improved = False
for idx in sorted_indices[:3]: # 只考虑前三个高单价岗位
if improved: # 如果已经改进了,就不再继续调整
break
# 确定微调方向
direction = 1 if total ", lambda event: confirm_add())
add_window.bind("", lambda event: add_window.destroy())
def delete_position(self):
"""删除选中的岗位"""
selected = self.tree.selection()
if not selected:
messagebox.showinfo("提示", "请先选择要删除的岗位!")
return
if len(self.positions) = cost_to_max:
# 如果剩余金额足够,直接分配到最大工作量
workloads[idx] = max_workload
remaining -= cost_to_max
else:
# 如果剩余金额不足,分配尽可能多的工作量,严格保证两位小数
additional = remaining / prices[idx]
workloads[idx] = round(min(max_workload, min_workload + additional), 2)
# 重新计算实际分配的金额(四位小数精度)
actual_additional_cost = round((workloads[idx] - min_workload) * prices[idx], 4)
remaining = round(remaining - actual_additional_cost, 4)
break
# 对结果进行精确计算验证
subtotals = [round(prices * workloads, 4) for i in range(n)]
total = sum(subtotals)
# 尝试微调最后一个分配的岗位,以减小误差
error = abs(total - target_amount)
if error > error_threshold:
# 获取最后一个非最大也非最小工作量的岗位
adjustable_idx = -1
for idx in sorted_indices:
if min_workload = 0:
# 尝试微调这个岗位的工作量
direction = 1 if total threshold:
workloads[idx] = 20.0
remaining -= max_money
else:
# 只要剩余金额减去本岗位最大分配后小于阈值,就不再分配到20
break
return workloads, remaining
def max_allocation_high_price_first(prices, target_amount):
n = len(prices)
indices = sorted(range(n), key=lambda i: -prices) # 单价从高到低
workloads = [1.0] * n
remaining = target_amount - sum(prices) # 先分配1.0
for idx in indices:
max_extra = 19.0 # 1.0已分配,还能分配19
can_allocate = min(max_extra, remaining / prices[idx])
workloads[idx] = 1.0 + can_allocate
if workloads[idx] > 20.0:
workloads[idx] = 20.0
remaining -= (workloads[idx] - 1.0) * prices[idx]
if remaining