监听本地文件,上传指定接口

查看 54|回复 4
作者:余生有妮   
这两天接了个私活,设备需要对接。设备厂家太low不支持对接,只能实现报告保存到指定目录。
要求:业务系统可以接收到设备的报告单;
最开始是想在设备电脑装上 apache或者
Nginx
  把报告目录变成http访问进行抓取,后来考虑设备电脑可能会经常关闭,会影响服务器效率就用python编写了个监听服务并且监听到文件后上传到指定接口;
代码如下
[Python] 纯文本查看 复制代码import os
import time
import logging
from urllib.parse import urlencode
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import requests
from datetime import datetime
# 配置日志
log_directory = 'logs'
if not os.path.exists(log_directory):
    os.makedirs(log_directory)
log_filename = os.path.join(log_directory, f"{datetime.now().strftime('%Y%m%d')}.log")
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 接口URL(上传pdf文件到)
API_URL = 'http://192.168.9.79:8080/thirdParty/boneAge/pdf'
class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.is_directory:
            return None
        elif event.src_path.endswith(".pdf"):
# 由于接口需要获取报告的检查时间,所以只能取文件最后修改时间来当做检查时间
            mod_time = datetime.fromtimestamp(os.path.getmtime(event.src_path)).strftime('%Y-%m-%d %H:%M:%S')
            logging.info(f"获取文件最后修改时间: {mod_time}")
            try:
                files = {'file': (os.path.basename(event.src_path), open(event.src_path, 'rb'), 'application/pdf')}
                query_params = {'update_time': mod_time}
                url_with_query = f"{API_URL}?{urlencode(query_params)}"
                logging.info(f"开始请上传报告文件 {url_with_query}")
                response = requests.post(url_with_query, files=files)
                response_data = response.json()
# 我在服务端接口返回的json格式{"status":true,"message":"成功"}
                if response_data['status']:
                    logging.info(f"File {event.src_path} : {response_data['message']}")
                else:
                    logging.error(f"文件上传失败 {event.src_path}: {response_data['message']}")
            except Exception as e:
                logging.error(f"文件上传异常 {event.src_path}: {str(e)}")
def start_observer(path):
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
if __name__ == "__main__":
    directory_to_watch = 'D:\\AgeExpert_Data\\rep_upload_data'
    start_observer(directory_to_watch)

接口, 宋体

kentsolver   

首排????
感受老司机的爱   

好新鲜,有点想学怎么用
Oldpit   

好思路,学习了,
yy20240721   

前排前排,先占再学
您需要登录后才可以回帖 登录 | 立即注册

返回顶部