Python批量备份交换机配置+自动巡检

查看 26|回复 0
作者:winty   
继上次发帖批量备份的功能后,有坛友要求增加功能,目前版本支持SSH、Telnet、自定义端口,增加了部分异常处理。
设备厂商支持思科、华为、华三。
自动巡检功能考虑到不同设备回显有所不同,需要大量正则匹配,暂时没时间搞这些,所以索性将命令回显全部显示,没做进一步的回显提取。
以下是程序运行示例:
[color=]#自动备份配置:


image.png (76.82 KB, 下载次数: 0)
下载附件
2023-3-24 15:19 上传

备份完成后,将配置保存于程序目录下的conf_bak文件夹下,如图所示:


image.png (29.56 KB, 下载次数: 0)
下载附件
2023-3-24 15:25 上传



image.png (38.18 KB, 下载次数: 0)
下载附件
2023-3-24 15:26 上传

[color=]#自动巡检交换机设备:


image.png (237.08 KB, 下载次数: 0)
下载附件
2023-3-24 15:20 上传

使用前需创建excel文档,写入设备登录信息,格式如下:


image.png (29.51 KB, 下载次数: 0)
下载附件
2023-3-24 15:24 上传

本人编码能力一般,没学过多少,大佬们勿喷,源码奉上:
[Python] 纯文本查看 复制代码# coding=utf-8
from netmiko import ConnectHandler
from openpyxl import load_workbook
import os
from sys import exit
from netmiko import exceptions
import re
# 读取excel内设备列表信息
def check_and_get_dev_list(filename, sheet_name):
    excel_information = []
    sheet_header = []
    wb = load_workbook(filename)
    sh = wb[sheet_name]
    # 获取最大行数
    row = sh.max_row
    # 获取最大列数
    column = sh.max_column
    data = []
    # 获取表头写入列表中方便调用
    for data_1 in range(1, column+1):
        get_sheet_header = sh.cell(row=1, column=data_1).value
        sheet_header.append(get_sheet_header)
    # 第一行为表头, 此处 row +1 是pyton循环时不读取最后一个数
    for row_1 in range(2, row + 1):
        # 存储一行信息
        sheet_data_1 = dict()
        # 逐行读取表中的数据
        for b in range(1, column + 1):
            cell = sh.cell(row=row_1, column=b).value
            # 将数据已字典形式写入 sheet_data_1 中
            # if cell != None:
            sheet_data_1[sheet_header[b-1]] = cell
        excel_information.append(sheet_data_1)
    for i in excel_information:
        if i['ip'] != None:
            data.append(i)
    return data
#获取excel数据并整合成dev字典
def get_dev():
    res = check_and_get_dev_list('./resource.xlsx', 'Sheet1')
    devices = []
    for i in res:
        if i['protocol'] == 'telnet':
            i['type'] = i['type']+'_telnet'
        dev = {'device_type':i['type'],
               'host': i['ip'],
               'username': i['username'],
               'password': i['password'],
               'secret': i['enpassword'],
               'port': i['port'],}
        devices.append(dev)
    return devices
# 配置批量备份导出
def devices_confbak(devices=''):
    # 创建备份文件夹
    try:
        path = './conf_bak'
        os.makedirs(path)
    except FileExistsError:
        pass
    # 存储连接失败的IP
    failed_ips = []
    # 循环登录设备获取配置
    for dev in devices:
        try:
            with ConnectHandler(**dev) as conn:
                print('\n----------成功登录到:' + dev['host'] + '----------')
                conn.enable()
                if 'cisco_ios' in dev['device_type']:
                    output = conn.send_command(command_string='show run')
                elif 'huawei' or 'hp_comware' in dev['device_type']:
                    output = conn.send_command(command_string='dis current-configuration')
                else:
                    print('error')
                with open('./conf_bak/'+ dev['host'] +'_conf_bak.txt', mode='w', encoding='utf8') as f:
                    print('正在备份:'+dev['host'])
                    # 文件读写异常处理
                    try:
                        f.write(output)
                    except PermissionError:
                        print('*****-无写入权限,请将文件夹赋予读写权限-*****')
                        continue
                    else:
                        print('备份成功!')
        # 连接异常处理
        except exceptions.NetmikoAuthenticationException:
            print('\n**********'+dev['host']+':登录验证失败!**********')
            failed_ips.append(dev['host'])
            continue
        except exceptions.NetmikoTimeoutException:
            print('\n**********'+dev['host']+':目标不可达!**********')
            failed_ips.append(dev['host'])
            continue
        except exceptions.ReadTimeout:
            print('\n**********'+dev['host']+':读取超时,请检查enable密码是否正确!**********')
            failed_ips.append(dev['host'])
            continue
    if len(failed_ips) > 0:
        print('\n以下设备连接失败,请检查:')
        for x in failed_ips:
            print(x)
    return 1
# 配置巡检
def devices_autocheck(devices='', cmd=''):
    # 存储命令执行回显
    results = []
    try:
        for x in range(len(devices)):
            # 循环登录设备
            with ConnectHandler(**devices[x]) as conn:
                conn.enable()
                print('正在巡检:'+devices[x]['host']+' ...')
                result = [devices[x]['host'],devices[x]['device_type']]
                for i in range(len(cmd)):
                    # 循环执行命令,根据不同设备执行不同命令
                    if 'cisco_ios' in devices[x]['device_type']:
                        output = conn.send_command(command_string=str(cmd['cisco']))
                    elif 'huawei' or 'hp_comware' in devices[x]['device_type']:
                        conn.send_command(command_string='sys',expect_string=']')
                        output = conn.send_command(command_string=str(cmd['huawei']))
                    result.append(output)
                results.append(result)
    except exceptions.NetmikoAuthenticationException:
        print('\n**********'+devices[x]['host']+':登录验证失败!**********')
    except exceptions.NetmikoTimeoutException:
        print('\n**********' + devices[x]['host'] + ':目标不可达!**********')
    except exceptions.ReadTimeout:
        print('\n**********' + devices[x]['host'] + ':读取超时,请检查enable密码是否正确!**********')
    return results
# 计算内存使用率
def get_mem(memstr,devtype=''):
    if 'cisco' in devtype:
        total_match = re.search(r'Processor Pool Total:\s+(\d+)', memstr)
        used_match = re.search(r'Used:\s+(\d+)', memstr)
        # 提取总数和已用数,并将其转换为整数
        total = int(total_match.group(1))
        used = int(used_match.group(1))
        # 计算使用百分比
        percentage = used / total * 100
        return f"{percentage:.0f}%"
    elif 'huawei' in devtype:
        match = re.search(r"Memory Using Percentage Is:\s*(\d+)%", memstr)
        if match:
            memory_percentage = match.group(1)
            return memory_percentage+'%'
        else:
            return "No match found."
# 获取CPU利用率
def get_cpu(cpustr,devtype=''):
    if 'cisco' in devtype:
        pattern = r"CPU utilization for five seconds: (\d+)%"
        match = re.search(pattern, cpustr)
        if match:
            cpu_utilization = match.group(1)
            return cpu_utilization+'%'
        else:
            return "No match found."
    elif 'huawei' in devtype:
        match = re.search(r"\b(\d+(\.\d+)?)%.*?\bMax", cpustr)
        if match:
            cpu_utilization = match.group(1)
            return cpu_utilization+'%'
        else:
            return "No match found."
# 运行主程序
if __name__ == '__main__':
    while True:
        print("\n##############################################\n")
        print("1:批量备份交换机配置")
        print("2:批量巡检交换机设备")
        print("0:退出")
        option = str(input("请输入需要的操作编号:"))
        if option == '1':
            dev = get_dev()
            devices_confbak(devices=dev)
            continue
        elif option == '2':
            # 定义巡检命令
            # cmds[x]['cisco']
            # cmds[x]['huawei']
            cmds = [
                {'cisco':'show clock','huawei':'display clock'},                        #检查时钟
                {'cisco':'show env power','huawei':'display power'},                    #检查电源
                {'cisco':'show env fan','huawei':'display fan'},                        #检查风扇
                {'cisco':'show env temperature status', 'huawei': 'display environment'},#检查温度
                {'cisco':'show processes cpu', 'huawei': 'display cpu-usage'},          #检查CPU利用率
                {'cisco':'show processes memory', 'huawei': 'display memory-usage'},    #检查内存利用率
            ]
            dev = get_dev()
            checkres = devices_autocheck(dev,cmds)
            for res in checkres:
                # print(res)
                print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
                print(res[0]+'-巡检结果:')
                print('\n时钟:\n'+res[2])
                print('电源:\n'+res[3])
                print('风扇:\n'+res[4])
                if 'Unrecognized command' in res[5]:
                    print('温度:\n该设备不支持获取此数据!')
                else:print('温度:\n'+res[5])
                print('CPU利用率:\n' + get_cpu(res[6],res[1]))
                print('内存利用率:\n' + get_mem(res[7],res[1]))
                print('\n+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
            continue
        elif option == '0':
            break
        else:
            print("请输入正确的编号!")

微软, 备份

您需要登录后才可以回帖 登录 | 立即注册