一个简单的IP地址管理系统IPAM 基于交换机的IP发现扫描

查看 114|回复 11
作者:xiaomingtt   
我们单位内网16个网段,所有终端都是静态IP。经常遇到找不到可用IP的情况。
有安全管理软件和准入系统,但网络设备、服务器、打印机等里面没有。
后来用PHPIPAM,后来加强了安全管理,PHPIPAM扫不到PC终端了。
后来用资产系统管理IP,因为更新不及时,更是不靠谱。
最后为了找一个能用的IP,经常要在准入系统、资产系统都查一遍,再在接入交换机查一遍才行。
也就是在接入交换机通过“display arp”命令(H3C)查询终端是否在线给了灵感。
我在核心交换机上用“display arp”一查,果然单位所有在线IP都能找到。
于是就用python做了这个程序,定时登录核心交换机执行“display arp”命令,并通过网页展示IP占用情况。


2025-09-05_084324.png (162.27 KB, 下载次数: 1)
下载附件
2025-9-5 11:26 上传

[Python] 纯文本查看 复制代码# -*- coding: utf-8 -*-
from http.server import SimpleHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs
from netmiko import ConnectHandler, NetMikoTimeoutException, NetMikoAuthenticationException
import json
import os
from datetime import datetime
from collections import defaultdict
import ipaddress
import html as htmllib
import threading
import time
# ----------------- 配置 -----------------
DATA_FILE = "data.json"
HOST = "0.0.0.0"
PORT = 10086
SCAN_INTERVAL = 600  
AUTO_SCAN = True   
# 交换机设备参数
device_yd = {
    'device_type': "hp_comware",
    'ip': "172.10.119.252",
    'username': "admin",
    'password': "Admin@1234",
    'port': 22,
    'session_log': 'h3clog.log',
    'session_log_file_mode': 'append',
    'timeout': 180,
    'fast_cli': False,
    'global_delay_factor': 1,
}
device_lt = {
    'device_type': "hp_comware",
    'ip': "172.10.119.253",
    'username': "admin",
    'password': "Admin@1234",
    'port': 22,
    'session_log': 'h3clog.log',
    'session_log_file_mode': 'append',
    'timeout': 180,
    'fast_cli': False,
    'global_delay_factor': 1,
}
# 线程安全的数据访问锁
data_lock = threading.Lock()
# ----------------- 数据读写 -----------------
def load_data():
    """线程安全读取 JSON 数据文件。"""
    with data_lock:
        if os.path.exists(DATA_FILE):
            try:
                with open(DATA_FILE, "r", encoding="utf-8") as f:
                    return json.load(f)
            except Exception as e:
                print(f"[load_data] 加载数据文件失败: {e}")
                return {}
        return {}
def save_data(data):
    """线程安全写回 JSON 数据文件。"""
    with data_lock:
        try:
            with open(DATA_FILE, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=4, ensure_ascii=False)
        except Exception as e:
            print(f"[save_data] 保存数据文件失败: {e}")
# ----------------- 交换机扫描逻辑 -----------------
def displayarp(device):
    """
    登录设备,执行 ARP 查询。
    """
    try:
        with ConnectHandler(**device) as connect:
            print(f"[displayarp] 连接到设备 {device['ip']} 成功")
            outtxt = connect.send_command(
                "display arp | include 172.10.", read_timeout=90
            )
            return outtxt
    except NetMikoTimeoutException:
        print(f"[displayarp] 设备 {device['ip']} 连接超时!")
    except NetMikoAuthenticationException:
        print(f"[displayarp] 设备 {device['ip']} 认证失败!")
    except Exception as e:
        print(f"[displayarp] 设备 {device['ip']} 未知错误: {e}")
    return ""
def parse_output(output):
    result = []
    if not output:
        return result
    for line in output.strip().split("\n"):
        parts = line.split()
        if len(parts) >= 2:
            ip, mac = parts[0], parts[1]
            result.append({"ip": ip, "mac": mac})
    return result
def update_data(parsed, data):
    """
    规则:
    - 本次扫描到的 IP 标记为 online,更新 mac/last_seen
    - 数据中存在但本次未扫描到的 IP 标记为 offline
    - user / device_type 不被程序覆盖(只保留已有值)
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    seen_ips = {item["ip"] for item in parsed}
    # 扫描到的 -> online
    for item in parsed:
        ip = item["ip"]
        mac = item["mac"]
        if ip not in data:
            data[ip] = {
                "mac": mac,
                "last_seen": now,
                "status": "online",
                "user": "",
                "device_type": ""
            }
        else:
            # 保留手工字段
            user = data[ip].get("user", "")
            device_type = data[ip].get("device_type", "")
            data[ip].update({
                "mac": mac,
                "last_seen": now,
                "status": "online",
                "user": user,
                "device_type": device_type
            })
    # 未扫描到的 -> offline(只处理曾经见过的)
    for ip in list(data.keys()):
        if ip not in seen_ips and "last_seen" in data[ip]:
            data[ip]["status"] = "offline"
    return data
def perform_scan():
    """执行扫描并更新数据。"""
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始自动扫描...")
    # 从两个设备获取 ARP 信息
    output1 = displayarp(device_yd)
    output2 = displayarp(device_lt)
    # 解析输出
    parsed = parse_output(output1) + parse_output(output2)
    # 更新数据
    data = load_data()
    data = update_data(parsed, data)
    save_data(data)
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 扫描完成,找到 {len(parsed)} 条记录")
def auto_scan_loop():
    """自动扫描循环。"""
    while True:
        try:
            perform_scan()
        except Exception as e:
            print(f"[auto_scan_loop] 自动扫描出错: {e}")
        time.sleep(SCAN_INTERVAL)
# ----------------- HTML 生成 -----------------
def ip_to_net(ip):
    """取网段前三位,例如 172.10.116.251 -> 172.10.116"""
    return ".".join(ip.split(".")[:3])
def safe(s):
    """HTML 转义输出。"""
    return htmllib.escape(str(s if s is not None else ""), quote=True)
def generate_html(data):
    """
    返回完整 HTML(同时也会写一个 index.html 到磁盘,方便离线查看)
    """
    nets = defaultdict(dict)
    # 统计容器:occupied/online 均只统计 1..254 主机位
    net_stats = defaultdict(lambda: {
        'total_ips': 254,
        'occupied': 0,
        'online': 0,
        'occupied_percent': 0.0,
        'online_percent': 0.0
    })
    # 组织每个网段的主机表
    for ip, info in data.items():
        try:
            ipaddress.IPv4Address(ip)
        except ipaddress.AddressValueError:
            continue
        net = ip_to_net(ip)
        last_octet = int(ip.split(".")[3])
        nets[net][last_octet] = info
        # 仅统计 1..254
        if 1  0:
            stats['occupied_percent'] = round((stats['occupied'] / stats['total_ips']) * 100, 1)
            stats['online_percent'] = round((stats['online'] / stats['total_ips']) * 100, 1)
    updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    html = f"""
IP地址管理系统
    body {{ font-family: Arial, sans-serif; margin: 20px; }}
    h1 {{ margin: 0 0 10px 0; }}
    .toolbar {{
        margin-bottom: 15px;
        display: flex;
        gap: 10px;
        align-items: center;
        flex-wrap: wrap;
    }}
    button {{
        padding: 6px 10px;
        border: 1px solid #ccc;
        background: #f7f7f7;
        border-radius: 6px;
        cursor: pointer;
    }}
    button:hover {{
        background: #eee;
    }}
    .legend {{
        margin: 20px 0;
        padding: 10px;
        border: 1px solid #ccc;
        border-radius: 8px;
        background: #f9f9f9;
        display: flex;
        flex-wrap: wrap;
        gap: 20px;
        align-items: center;
    }}
    .legend-item {{
        display: flex;
        align-items: center;
        gap: 6px;
        font-size: 14px;
    }}
    .legend-box {{
        width: 20px;
        height: 20px;
        border-radius: 4px;
        border: 1px solid #333;
    }}
    .net-block {{
        margin: 28px 0;
        border: 1px solid #e0e0e0;
        border-radius: 8px;
        padding: 15px;
        background: #fafafa;
    }}
    .net-header {{
        display: flex;
        justify-content: space-between;
        align-items: center;
        margin-bottom: 15px;
        padding-bottom: 10px;
        border-bottom: 1px solid #eee;
    }}
    .net-stats {{
        display: flex;
        gap: 20px;
        font-size: 14px;
        color: #666;
    }}
    .stat-item {{
        display: flex;
        flex-direction: column;
        align-items: center;
        padding: 8px 12px;
        background: white;
        border-radius: 6px;
        border: 1px solid #e0e0e0;
        min-width: 80px;
    }}
    .stat-value {{
        font-weight: bold;
        font-size: 16px;
        margin-bottom: 2px;
    }}
    .stat-label {{
        font-size: 12px;
        color: #888;
    }}
    .occupied-stat .stat-value {{ color: #ff6b35; }}
    .online-stat .stat-value {{ color: #28a745; }}
    .grid {{
        display: grid;
        grid-template-columns: repeat(32, 1fr);
        gap: 4px;
        width: 100%;
    }}
    .cell {{
        background-color: #ccc;
        color: #111;
        font-size: 13px;
        display: flex;
        flex-direction: column;
        justify-content: center;
        align-items: center;
        position: relative;
        border-radius: 6px;
        cursor: pointer;
        height: 58px;
        padding: 4px 2px 6px;
        box-sizing: border-box;
        transition: transform .05s ease-in-out;
    }}
    .cell:active {{ transform: scale(0.98); }}
    .ipnum {{ line-height: 1; }}
    .device-icon {{
        width: 24px; height: 24px;
        margin-bottom: 4px;
        display: block;
        pointer-events: none;
        filter: drop-shadow(0 0 1px rgba(0,0,0,.2));
    }}
    .online  {{ background-color: #28a745; color: #fff; }}
    .offline {{ background-color: #007bff; color: #fff; }}
    .reserved {{ background-color: #e9ecef; color: #999; cursor: not-allowed; }}
    .tooltip {{
        visibility: hidden;
        background-color: rgba(0, 0, 0, 0.85);
        color: #fff;
        text-align: left;
        padding: 6px 10px;
        border-radius: 6px;
        position: absolute;
        z-index: 10;
        font-size: 12px;
        bottom: 110%;
        left: 50%;
        transform: translateX(-50%);
        white-space: nowrap;
        pointer-events: none;
    }}
    .cell:hover .tooltip {{ visibility: visible; }}
    #modalOverlay {{
        display: none;
        position: fixed; inset: 0;
        background: rgba(0,0,0,.45); z-index: 999;
    }}
    #editModal {{
        display: none; position: fixed;
        top: 50%; left: 50%; transform: translate(-50%, -50%);
        width: 320px; background: #fff; border-radius: 10px;
        box-shadow: 0 10px 30px rgba(0,0,0,.25); z-index: 1000; padding: 16px;
    }}
    #editModal h3 {{ margin: 0 0 12px 0; }}
    #editModal label {{ font-size: 13px; color: #333; }}
    #editModal input, #editModal select {{
        width: 100%; margin: 6px 0 12px 0; padding: 6px 8px;
        font-size: 14px; border: 1px solid #ccc; border-radius: 6px; box-sizing: border-box;
    }}
    .modal-actions {{ display: flex; gap: 8px; justify-content: flex-end; }}
    .muted {{ color: #666; font-size: 12px; }}
    .status-badge {{
        padding: 2px 8px; border-radius: 12px; font-size: 12px; font-weight: bold;
    }}
    .status-enabled {{ background: #28a745; color: white; }}
    .status-disabled {{ background: #6c757d; color: white; }}

    IP 地址管理系统
   
        最后更新:{updated_at}
        手动扫描/刷新
        每十分钟左右自动扫描一次在线IP,无特殊情况无需手动扫描,通过浏览器刷新页面查看即可。
   
   
         空闲
         在线
         离线
         保留
        
[img][/img]
PC
        
[img][/img]
服务器
        
[img][/img]
打印机
        
[img][/img]
网络设备
   
"""
    for net, hosts in sorted(nets.items()):
        stats = net_stats[net]
        html += f"""
   
        
            网段 {net}.0/24
            
               
                    {stats['occupied']}/{stats['total_ips']}
                    占用 {stats['occupied_percent']}%
               
               
                    {stats['online']}/{stats['total_ips']}
                    在线 {stats['online_percent']}%
               
            
        
        """
        for i in range(256):
            ip = f"{net}.{i}"
            # 默认值(空闲)
            mac = "-"
            last_seen = "-"
            status = "idle"
            user = ""
            device_type = ""
            if i in hosts:
                info = hosts
                mac = info.get("mac", "-")
                last_seen = info.get("last_seen", "-")
                status = info.get("status", "idle")
                user = info.get("user", "")
                device_type = info.get("device_type", "")
            # 保留地址禁用编辑
            is_reserved = (i == 0 or i == 255)
            if is_reserved:
                cls = "cell reserved"
            else:
                # 决定 cell 类
                if status == "online":
                    cls = "cell online"
                elif status == "offline":
                    cls = "cell offline"
                else:
                    cls = "cell"
            # 图标(有类型才显示)
            icon_html = f"

" if device_type else ""
            # tooltip 内容
            tooltip_html = (
                f"IP: {safe(ip)}
"
                f"MAC: {safe(mac)}
"
                f"最后在线: {safe(last_seen)}
"
                f"使用人: {safe(user) if user else '未登记'}
"
                f"设备类型: {safe(device_type) if device_type else '未登记'}
"
                f"状态: {safe(status)}"
            )
            onclick_attr = "" if is_reserved else "onclick=\"openModalFromCell(this)\""
            html += f"""
            
                {icon_html}
                {i}
                {tooltip_html}
            """
        html += ""
    # 弹窗与脚本
    html += """
   
   
        编辑 IP 信息
        
            
            使用人
            
            设备类型
            
                未登记
                PC
                Server
                Print
                Net
            
            状态
            
                在线
                离线
                空闲
            
            
                取消
                保存
            
        
   

"""
    # 也写一份 index.html 到本地,便于离线打开
    try:
        with open("index.html", "w", encoding="utf-8") as f:
            f.write(html)
    except Exception as _:
        pass
    return html
# ----------------- HTTP 服务 -----------------
class Handler(SimpleHTTPRequestHandler):
    def do_GET(self):
        """
        - "/" 或 "/index.html":动态生成页面
        - "/refresh":触发扫描,返回 JSON
        - 其他路径:静态资源交由父类处理(用于 .png 图标等)
        """
        # 首页:动态生成 HTML
        if self.path == "/" or self.path.startswith("/index.html"):
            data = load_data()
            html = generate_html(data)
            self.send_response(200)
            self.send_header("Content-Type", "text/html; charset=utf-8")
            self.end_headers()
            self.wfile.write(html.encode("utf-8"))
            return
        # 手动扫描:刷新数据并返回 JSON
        if self.path == "/refresh":
            try:
                perform_scan()
                resp = {"ok": True, "message": "扫描完成"}
            except Exception as e:
                resp = {"ok": False, "error": f"扫描失败: {str(e)}"}
            js = json.dumps(resp, ensure_ascii=False).encode("utf-8")
            self.send_response(200)
            self.send_header("Content-Type", "application/json; charset=utf-8")
            self.end_headers()
            self.wfile.write(js)
            return
        # 其他静态资源(如 .png 图标)使用父类默认处理
        return super().do_GET()
    def do_POST(self):
        """保存编辑:使用人 / 设备类型 / 状态。"""
        if self.path == "/update":
            length = int(self.headers.get("Content-Length", "0") or 0)
            body = self.rfile.read(length).decode("utf-8")
            params = parse_qs(body)
            ip = (params.get("ip", [""])[0] or "").strip()
            user = (params.get("user", [""])[0] or "").strip()
            device_type = (params.get("device_type", [""])[0] or "").strip()
            status = (params.get("status", [""])[0] or "").strip() or "idle"
            try:
                ipaddress.IPv4Address(ip)
            except Exception:
                return self._json({"ok": False, "error": "IP 地址不合法"})
            data = load_data()
            if ip not in data:
                # 新增记录(手工登记)
                data[ip] = {
                    "mac": data.get(ip, {}).get("mac", "-"),
                    "last_seen": data.get(ip, {}).get("last_seen", "-"),
                    "status": status,
                    "user": user,
                    "device_type": device_type
                }
            else:
                # 修改仅影响可手工项 + 状态;不动 mac/last_seen(除非本来不存在)
                data[ip]["user"] = user
                data[ip]["device_type"] = device_type
                data[ip]["status"] = status
                data[ip]["mac"] = data[ip].get("mac", "-")
                data[ip]["last_seen"] = data[ip].get("last_seen", "-")
            save_data(data)
            record = data[ip]
            return self._json({"ok": True, "record": {
                "ip": ip,
                "user": record.get("user", ""),
                "device_type": record.get("device_type", ""),
                "status": record.get("status", "idle"),
                "mac": record.get("mac", "-"),
                "last_seen": record.get("last_seen", "-"),
            }})
        # 未知 POST
        self.send_response(404)
        self.end_headers()
    # 小工具:返回 JSON
    def _json(self, obj, code=200):
        js = json.dumps(obj, ensure_ascii=False).encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.end_headers()
        self.wfile.write(js)
def run():
    """启动服务;根据配置决定是否自动扫描。"""
    if AUTO_SCAN:
        # 启动自动扫描线程
        scanner_thread = threading.Thread(target=auto_scan_loop, daemon=True)
        scanner_thread.start()
        """
        # 执行一次初始扫描
        try:
            perform_scan()
        except Exception as e:
            print(f"[run] 初始扫描失败: {e}")
        """
    # 启动HTTP服务器
    httpd = HTTPServer((HOST, PORT), Handler)
    print(f"服务已启动:http://{HOST}:{PORT}")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.server_close()
        print("服务已关闭。")
if __name__ == "__main__":
    run()

数据, 交换机

xiaomingtt
OP
  


locoman 发表于 2025-9-5 12:45
display arp
请教:
1. 是不是有重复项?

我想要的功能只是看看哪些IP地址被占用,哪些可以分配,是不是有重复无所谓。不在线的终端一次查不到不要紧,程序可以设置10分钟查询一次,7X24小时运行,只要终端上线就会被记录,还是根据我的需求,我不是每天都有寻找可用IP的需求,可能三两个月才找一次。关于老化就更没影响了,ARP老化是为了找到IP-MAC的正确对应关系,我需要的只是IP占用情况。
dork   

给需要使用的人提个醒:此代码不可以直接拿来用,其核心关键命令:
outtxt = connect.send_command(
                "display arp | include 172.10.", read_timeout=90
            )
是172.10网段的。一般单位不是这个,需要修改此处。
bester   

安全管理软件和准入系统 用的是啥
WHW117   

太给力了!
稻海香   

没有看到下载地址
liwei_show   

大佬有没有成品呀
rhci   

好了,这个需要核心交换机的支持,还需要是H3C的交换机,其他无办法,如果能变为本机扫描局域网机器获取,就好了。
locoman   

display arp
请教:
1. 是不是有重复项?
2. 不在线的终端是不是就查不到?
3. ARP 老化时间的影响怎么办?
谢谢大佬指点……
bdcpc   

完全看不懂
您需要登录后才可以回帖 登录 | 立即注册