第一部分:从网盘获取大练兵数据并存入redis[Python] 纯文本查看 复制代码import time
import json
import arrow
import requests
import datetime
import pandas as pd
import redis
rr = redis.StrictRedis('ip', 6804, 3, decode_responses=True)
# cookie = rr.get('disk_cookie')
# 在线网盘浏览器凭证
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Cookie':'cloudreve-session=MTY4NDExMjQyMHxOd3dBTkRWS01qTkNVRkpUVjFaUU5ETTBSVmhFUmxkUlJFaEJXbEpUTWs5WE4wc3pTRW8zV1ZrMFZWQlZUa1EzUlVKWE5VZFFRVkU9fH6n8iuWQ99K1RuM3BsnwI0aympkQA_nYcHyGFY1bZTl'
}
def read_wangpan_lists(day_now):
# 读取用户列表
with open('name_list.txt', 'r', encoding='utf-8') as fp:
nn_list = []
# 将逐行读取的用户名引入到url
for line in fp.readlines():
line = line.replace('\n', '')
url = f'http://xxx5/api/v3/directory/xxx/{day_now}/{line}'
# 使用Get请求访问网盘资源,并提取data列表
res = requests.get(url=url, headers=header)
print(url)
print(res.text)
# 这里是web网盘
username = line.split('-')[1]
uid = int(line.split('-')[0])
riqi = arrow.get(year=2023, month=int(day_now.split('.')[0]), day=int(day_now.split('.')[1]))
try:
resp = res.json()['data']
list_name = resp['objects']
for x in list_name:
a = (x['name'])
b = (x['path'])
nn_list.append({'name': username, 'uid': uid, 'wangpan': a, 'riqi': riqi.format('YYYY-MM-DD')})
except Exception as e:
print(111, '文件夹读取出错')
print(e)
# df = pd.DataFrame(nn_list)
# df_tongji = df.groupby(['uid'])['wangpan'].count()
return nn_list
if __name__ == '__main__':
while 1:
for riqi in ['5.8','5.6','4.26','4.25', '4.24', '4.23', '4.22']:
try:
nnlist = read_wangpan_lists(riqi)
list_str = json.dumps(nnlist)
rr.hset('chunji', riqi, list_str)
except Exception as e:
print('出错: ', e)
time.sleep(180)
第二部分:Web UI使用的库:streamlit,从另外一部分地方获取数据(本地NAS)
[Python] 纯文本查看 复制代码import streamlit as st
import pandas as pd
from pathlib import Path
import random
import arrow
import yaml
import io
import redis
import base64
import json
rr = redis.StrictRedis('xxxip', 6804, 3, decode_responses=True)
now = arrow.get(tzinfo='Asia/Shanghai').format('YYYY年MM月DD日 , HH时mm分ss秒')
now_2 = '统计时间为:' + now
day_now = '4.23'
# print(day_now)
st.set_page_config(page_title="春季大练兵统计",
page_icon='https://xxxx.png', layout='centered',
initial_sidebar_state='auto', menu_items=None)
c1, c2 = st.columns([1, 4])
c1.image('pic/logo.png', width=80)
c2.title('春季大练兵统计')
st.info(now_2)
with open('cfg.yml', 'r') as f:
cfg = yaml.load(f, yaml.SafeLoader)
mm = st.selectbox('日期选择:', cfg.get('riqi'))
day_now = mm
# 读取188.24 这里是NAS本地网盘
def get_all_mp4():
mmm = Path('D:\\春季大练兵\\4.20')
# 读取NAS很麻烦,时间紧迫,该程序在nas运行,自由更改
mm_list = []
for x in mmm.iterdir():
kk = [m for m in x.iterdir()]
mm_list.append({'name': x.name.split('-')[1],
'uid': int(x.name.split('-')[0]),
'vvs': len(kk)
})
return mm_list
# 76
df_users = pd.read_excel('users.xlsx')
df_users.columns = ['uid', 'zsxm', 'name', 'dept', 'city']
# df = pd.DataFrame(get_all_mp4())
nn_list = json.loads(rr.hget('chunji', day_now))
df = pd.DataFrame(nn_list)
df_wangpan = df.groupby(['uid'])['wangpan'].count()
print(df_wangpan.head())
# df = pd.concat([df,df_wangpan],axis=0)
# df = pd.merge(df, df_wangpan, 'outer', on='uid').fillna(0)
# print('jichu', df.head())
# df['zongshu'] = df['vvs'] + df['wangpan']
# df_users.merge(df,on='uid',how='left')
df = pd.merge(df_users, df_wangpan, 'outer', on='uid')
df.columns = ['编号', '姓名', '花名', '部门', '城市', f'{day_now}视频数']
df = df[['编号', '花名', '部门', '城市', f'{day_now}视频数']]
df['upit'] = df[f'{day_now}视频数'].apply(lambda x: x > 0 and 1 or 0)
df['sum'] = 1
df_dept = df[['部门', 'upit', 'sum']].groupby('部门', as_index=False).sum()
df_dept.columns = ['部门', '已提交人数', '总人数']
df_dept['未提交人数'] = df_dept['总人数'] - df_dept['已提交人数']
# yitijiao1 = df['upit'].sum()
yitijiao = df['upit'].sum()
col1, col2, col3 = st.columns(3)
col1.metric("已提交人数", f"{yitijiao} 人", f"{random.randint(2, 5)} 人")
col2.metric("未提交人数", f"{187 - yitijiao} 人", f"-{random.randint(2, 5)}人")
col3.metric("提交率", f"{round(yitijiao / 187 * 100, 2)}%", "4%")
st.bar_chart(df_dept[['部门', '已提交人数', '未提交人数']], x='部门')
st.dataframe(df_dept, height=500, width=760)
df_mingxi = df[['编号', '花名', '部门', '城市', f'{day_now}视频数']].fillna(0)
st.dataframe(df_mingxi, height=500, width=760)
# 定义一个函数来生成CSV文件,并将其编码为base64以便下载
def create_download_link_csv(df, title="点击下方链接下载 Excel 文件:"):
output = io.BytesIO()
writer = pd.ExcelWriter(output, engine='openpyxl')
df.to_excel(writer, index=False, sheet_name='Sheet1')
writer.save()
output.seek(0)
b64 = base64.b64encode(output.read()).decode()
return f'{title} [url=]下载明细[/url]'
# 创建一个示例数据集
# 创建一个下载链接
download_link = create_download_link_csv(df_mingxi)
# 将数据集显示在应用程序上,并提供下载链接
st.markdown(download_link, unsafe_allow_html=True)
欢迎各位大佬共同学习!