python写的同时看多台服务器的日志

查看 16|回复 1
作者:SherlockProel   
我公司没有ELK等查看日志的工具,然而还用了微服务,那么线上排查日志就尴尬了,我们一个服务有四个节点,但是请求只发到某一个上面,所以每次查看日志的时候都得同时登录四台服务器查看;万幸服务器的日志的全路径是一样的,都在/root/logs/服务/log_total.log下面,逛吾爱的时候发现有人用python写工具,我想是不是也可以搞一个试试,于是就做了这个工具,支持十几个感觉没问题,当然1个也能用,运行结果和代码都在下面。
使用方法很简单,第一个textarea是输入多台服务器的,格式是 ip|root|password,如果多个服务就直接换行,如果你密码里面带“|”可以在源码里面改一下,我贴在下面了;第二个输入框就是查看日志的命令,grep,tail,awk等都支持,原则上支持所有带返回值的命令;然后点击查询就行了,下面结果区有横向竖向的滚动条,如果想把日志换行也可以在代码里面修改。
[Python] 纯文本查看 复制代码#界面
import tkinter as tk
# 导入 messagebox 用于提示
from tkinter import ttk, messagebox
#新开一个线程
import threading
#倒计时
import time
#通过 SSH 协议与远程服务器交互
import paramiko
"""
@description: 简单的发送命令到Linux并且接受返回值,服务器数量最高无上限,和你在textarea中输入的数量一样
ip地址|用户名|密码
ip地址|用户名|密码
ip地址|用户名|密码
ip地址|用户名|密码
"""
class LogViewerApp:
    def __init__(self, root):
        self.root = root
        self.window_size = 850
        self.init_ui()
    def init_ui(self):
        """初始化主窗口和整体布局"""
        self.root.title("查看linux服务器日志@XBL")
        self.root.geometry(f"{self.window_size}x{self.window_size}")
        self.create_top_frame()
        self.create_bottom_frame()
    def create_top_frame(self):
        """创建顶部控制区域"""
        self.top_frame = tk.Frame(self.root, height=int(self.window_size * 0.1), bd=1, relief="solid")
        self.top_frame.pack(side="top", fill="x")
        self.create_textarea_top()
        self.create_input_fields()
        self.create_query_button()
        self.create_loading_label()
    def create_textarea_top(self):
        """创建第一排的 textareaip@user@password"""
        self.text_area_top = tk.Text(self.top_frame, height=2, width=35)
        self.text_area_top.insert("1.0", "ip地址|用户名|密码")
        self.text_area_top.pack(side="left")
    def create_input_fields(self):
        """执行命令框"""
        self.exec_field = tk.Entry(self.top_frame, width=60)
        self.exec_field.insert(0, "tail -n 100 /root/logs/snowy-biz-app-log/log_total.log")
        self.exec_field.pack(side="left", padx=1, expand=True, fill="both")
    def create_query_button(self):
        """创建查询按钮"""
        self.button = tk.Button(self.top_frame, text="查询", width=5, command=self.on_query)
        self.button.pack(side="left", padx=1)
    def create_loading_label(self):
        """创建 loading 提示标签"""
        self.loading_label = tk.Label(self.top_frame, text="", width=15,fg="red")
        self.loading_label.pack(side="left", padx=1)
    def create_bottom_frame(self):
        """创建底部日志显示区域"""
        self.notebook = ttk.Notebook(self.root)
        self.notebook.pack(side="top", fill="both", expand=True)
        # 初始化一个默认页面(可选)
        default_tab = tk.Frame(self.notebook)
        self.notebook.add(default_tab, text="请查询")
    def on_query(self):
        """点击查询时触发的逻辑"""
        if not self.validate_inputs():
            return
        self.loading_label.config(text="加载中...")
        self.button.config(state=tk.DISABLED)
        threading.Thread(target=self.background_task).start()
    #查询按钮入口
    def validate_inputs(self):
        """验证输入是否为空"""
        ip_user_pwd = self.text_area_top.get("1.0", tk.END).strip()
        cmd_field = self.exec_field.get().strip()
        if not ip_user_pwd:
            messagebox.showwarning("输入错误", "第一个输入框不能为空!")
            return False
        if not cmd_field:
            messagebox.showwarning("输入错误", "第二个输入框不能为空!")
            return False
        return True
    def background_task(self):
        """后台任务:执行 SSH 命令并更新结果"""
        try:
            result = self.run_ssh_command()
            self.update_result(result)
        except Exception as e:
            self.update_result(f"SSH 错误:{str(e)}")
    def update_result(self, results):
        """
       更新结果显示到 notebook 的各个 tab
       :param results: dict 类型,格式:{"host": "output"}
       """
        # 清空原有 tab
        for tab in self.notebook.winfo_children():
            self.notebook.forget(tab)
            tab.destroy()
        for host, output in results.items():
            tab = tk.Frame(self.notebook)
            self.notebook.add(tab, text=host)
            # 创建带滚动条的 Text 控件
            text_area = tk.Text(tab, wrap=tk.NONE)
            scroll_y = tk.Scrollbar(tab, orient=tk.VERTICAL, command=text_area.yview)
            scroll_x = tk.Scrollbar(tab, orient=tk.HORIZONTAL, command=text_area.xview)
            text_area.config(yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)
            text_area.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
            scroll_y.pack(side=tk.RIGHT, fill=tk.Y)
            scroll_x.pack(side=tk.BOTTOM, fill=tk.X)
            # 只读模式
            text_area.insert(tk.END, output)
            text_area.config(state=tk.DISABLED)
            # 添加 tab 到 notebook 截断长主机名防止显示异常
            self.notebook.add(tab, text=host[:15])
        self.loading_label.config(text="")
        self.button.config(state=tk.NORMAL)
    #ssh远程执行命令
    def run_ssh_command(self):
        ip_user_pwd_list = self.text_area_top.get("1.0", tk.END).strip().splitlines()
        results = {}
        for line in ip_user_pwd_list:
            if not line.strip():
                continue
            host, user, pwd = line.split("|")
            cmd_field = self.exec_field.get().strip()
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            try:
                client.connect(hostname=host, port=22, username=user, password=pwd)
                stdin, stdout, stderr = client.exec_command(cmd_field)
                output = stdout.read().decode()
                error = stderr.read().decode()
                results[host] = output or error
            except Exception as e:
                results[host] = f"SSH 错误:{str(e)}"
            finally:
                client.close()
        return results
if __name__ == "__main__":
    root = tk.Tk()
    #给tab设置样式
    style = ttk.Style()
    style.configure("TNotebook.Tab", padding=[10, 5], font=('Arial', 10))
    #屏幕最大化
    try:
        # Windows/Linux 使用 zoomed
        root.state('zoomed')
    except:
        # macOS 或不支持 zoomed 的系统使用屏幕尺寸
        screen_width = root.winfo_screenwidth()
        screen_height = root.winfo_screenheight()
        root.geometry(f"{screen_width}x{screen_height}")
    app = LogViewerApp(root)
    def ssh():
        try:
            result = app.run_ssh_command()
            # 使用 after 安全线程更新
            root.after(0, lambda: update_result(result))
        except Exception as e:
            error_message = f"发生错误:{str(e)}"
            root.after(0, lambda: update_result(error_message))
    def update_result(result_text):
        app.text_area_bottom.delete("1.0", tk.END)
        app.text_area_bottom.insert(tk.END, result_text)
    # threading.Thread(target=ssh).start()
    root.mainloop()

日志, 服务器

SherlockProel
OP
  

第一次发帖不会玩儿,我就放了一张图,帖子里面的图重复了。
您需要登录后才可以回帖 登录 | 立即注册

返回顶部