有安全管理软件和准入系统,但网络设备、服务器、打印机等里面没有。
后来用PHPIPAM,后来加强了安全管理,PHPIPAM扫不到PC终端了。
后来用资产系统管理IP,因为更新不及时,更是不靠谱。
最后为了找一个能用的IP,经常要在准入系统、资产系统都查一遍,再在接入交换机查一遍才行。
也就是在接入交换机通过“display arp”命令(H3C)查询终端是否在线给了灵感。
我在核心交换机上用“display arp”一查,果然单位所有在线IP都能找到。
于是就用python做了这个程序,定时登录核心交换机执行“display arp”命令,并通过网页展示IP占用情况。

2025-09-05_084324.png (162.27 KB, 下载次数: 1)
下载附件
2025-9-5 11:26 上传
[Python] 纯文本查看 复制代码# -*- coding: utf-8 -*-
from http.server import SimpleHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs
from netmiko import ConnectHandler, NetMikoTimeoutException, NetMikoAuthenticationException
import json
import os
from datetime import datetime
from collections import defaultdict
import ipaddress
import html as htmllib
import threading
import time
# ----------------- 配置 -----------------
DATA_FILE = "data.json"
HOST = "0.0.0.0"
PORT = 10086
SCAN_INTERVAL = 600
AUTO_SCAN = True
# 交换机设备参数
device_yd = {
'device_type': "hp_comware",
'ip': "172.10.119.252",
'username': "admin",
'password': "Admin@1234",
'port': 22,
'session_log': 'h3clog.log',
'session_log_file_mode': 'append',
'timeout': 180,
'fast_cli': False,
'global_delay_factor': 1,
}
device_lt = {
'device_type': "hp_comware",
'ip': "172.10.119.253",
'username': "admin",
'password': "Admin@1234",
'port': 22,
'session_log': 'h3clog.log',
'session_log_file_mode': 'append',
'timeout': 180,
'fast_cli': False,
'global_delay_factor': 1,
}
# 线程安全的数据访问锁
data_lock = threading.Lock()
# ----------------- 数据读写 -----------------
def load_data():
"""线程安全读取 JSON 数据文件。"""
with data_lock:
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[load_data] 加载数据文件失败: {e}")
return {}
return {}
def save_data(data):
"""线程安全写回 JSON 数据文件。"""
with data_lock:
try:
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
except Exception as e:
print(f"[save_data] 保存数据文件失败: {e}")
# ----------------- 交换机扫描逻辑 -----------------
def displayarp(device):
"""
登录设备,执行 ARP 查询。
"""
try:
with ConnectHandler(**device) as connect:
print(f"[displayarp] 连接到设备 {device['ip']} 成功")
outtxt = connect.send_command(
"display arp | include 172.10.", read_timeout=90
)
return outtxt
except NetMikoTimeoutException:
print(f"[displayarp] 设备 {device['ip']} 连接超时!")
except NetMikoAuthenticationException:
print(f"[displayarp] 设备 {device['ip']} 认证失败!")
except Exception as e:
print(f"[displayarp] 设备 {device['ip']} 未知错误: {e}")
return ""
def parse_output(output):
result = []
if not output:
return result
for line in output.strip().split("\n"):
parts = line.split()
if len(parts) >= 2:
ip, mac = parts[0], parts[1]
result.append({"ip": ip, "mac": mac})
return result
def update_data(parsed, data):
"""
规则:
- 本次扫描到的 IP 标记为 online,更新 mac/last_seen
- 数据中存在但本次未扫描到的 IP 标记为 offline
- user / device_type 不被程序覆盖(只保留已有值)
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
seen_ips = {item["ip"] for item in parsed}
# 扫描到的 -> online
for item in parsed:
ip = item["ip"]
mac = item["mac"]
if ip not in data:
data[ip] = {
"mac": mac,
"last_seen": now,
"status": "online",
"user": "",
"device_type": ""
}
else:
# 保留手工字段
user = data[ip].get("user", "")
device_type = data[ip].get("device_type", "")
data[ip].update({
"mac": mac,
"last_seen": now,
"status": "online",
"user": user,
"device_type": device_type
})
# 未扫描到的 -> offline(只处理曾经见过的)
for ip in list(data.keys()):
if ip not in seen_ips and "last_seen" in data[ip]:
data[ip]["status"] = "offline"
return data
def perform_scan():
"""执行扫描并更新数据。"""
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始自动扫描...")
# 从两个设备获取 ARP 信息
output1 = displayarp(device_yd)
output2 = displayarp(device_lt)
# 解析输出
parsed = parse_output(output1) + parse_output(output2)
# 更新数据
data = load_data()
data = update_data(parsed, data)
save_data(data)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 扫描完成,找到 {len(parsed)} 条记录")
def auto_scan_loop():
"""自动扫描循环。"""
while True:
try:
perform_scan()
except Exception as e:
print(f"[auto_scan_loop] 自动扫描出错: {e}")
time.sleep(SCAN_INTERVAL)
# ----------------- HTML 生成 -----------------
def ip_to_net(ip):
"""取网段前三位,例如 172.10.116.251 -> 172.10.116"""
return ".".join(ip.split(".")[:3])
def safe(s):
"""HTML 转义输出。"""
return htmllib.escape(str(s if s is not None else ""), quote=True)
def generate_html(data):
"""
返回完整 HTML(同时也会写一个 index.html 到磁盘,方便离线查看)
"""
nets = defaultdict(dict)
# 统计容器:occupied/online 均只统计 1..254 主机位
net_stats = defaultdict(lambda: {
'total_ips': 254,
'occupied': 0,
'online': 0,
'occupied_percent': 0.0,
'online_percent': 0.0
})
# 组织每个网段的主机表
for ip, info in data.items():
try:
ipaddress.IPv4Address(ip)
except ipaddress.AddressValueError:
continue
net = ip_to_net(ip)
last_octet = int(ip.split(".")[3])
nets[net][last_octet] = info
# 仅统计 1..254
if 1 0:
stats['occupied_percent'] = round((stats['occupied'] / stats['total_ips']) * 100, 1)
stats['online_percent'] = round((stats['online'] / stats['total_ips']) * 100, 1)
updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
html = f"""
IP地址管理系统
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1 {{ margin: 0 0 10px 0; }}
.toolbar {{
margin-bottom: 15px;
display: flex;
gap: 10px;
align-items: center;
flex-wrap: wrap;
}}
button {{
padding: 6px 10px;
border: 1px solid #ccc;
background: #f7f7f7;
border-radius: 6px;
cursor: pointer;
}}
button:hover {{
background: #eee;
}}
.legend {{
margin: 20px 0;
padding: 10px;
border: 1px solid #ccc;
border-radius: 8px;
background: #f9f9f9;
display: flex;
flex-wrap: wrap;
gap: 20px;
align-items: center;
}}
.legend-item {{
display: flex;
align-items: center;
gap: 6px;
font-size: 14px;
}}
.legend-box {{
width: 20px;
height: 20px;
border-radius: 4px;
border: 1px solid #333;
}}
.net-block {{
margin: 28px 0;
border: 1px solid #e0e0e0;
border-radius: 8px;
padding: 15px;
background: #fafafa;
}}
.net-header {{
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
padding-bottom: 10px;
border-bottom: 1px solid #eee;
}}
.net-stats {{
display: flex;
gap: 20px;
font-size: 14px;
color: #666;
}}
.stat-item {{
display: flex;
flex-direction: column;
align-items: center;
padding: 8px 12px;
background: white;
border-radius: 6px;
border: 1px solid #e0e0e0;
min-width: 80px;
}}
.stat-value {{
font-weight: bold;
font-size: 16px;
margin-bottom: 2px;
}}
.stat-label {{
font-size: 12px;
color: #888;
}}
.occupied-stat .stat-value {{ color: #ff6b35; }}
.online-stat .stat-value {{ color: #28a745; }}
.grid {{
display: grid;
grid-template-columns: repeat(32, 1fr);
gap: 4px;
width: 100%;
}}
.cell {{
background-color: #ccc;
color: #111;
font-size: 13px;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
position: relative;
border-radius: 6px;
cursor: pointer;
height: 58px;
padding: 4px 2px 6px;
box-sizing: border-box;
transition: transform .05s ease-in-out;
}}
.cell:active {{ transform: scale(0.98); }}
.ipnum {{ line-height: 1; }}
.device-icon {{
width: 24px; height: 24px;
margin-bottom: 4px;
display: block;
pointer-events: none;
filter: drop-shadow(0 0 1px rgba(0,0,0,.2));
}}
.online {{ background-color: #28a745; color: #fff; }}
.offline {{ background-color: #007bff; color: #fff; }}
.reserved {{ background-color: #e9ecef; color: #999; cursor: not-allowed; }}
.tooltip {{
visibility: hidden;
background-color: rgba(0, 0, 0, 0.85);
color: #fff;
text-align: left;
padding: 6px 10px;
border-radius: 6px;
position: absolute;
z-index: 10;
font-size: 12px;
bottom: 110%;
left: 50%;
transform: translateX(-50%);
white-space: nowrap;
pointer-events: none;
}}
.cell:hover .tooltip {{ visibility: visible; }}
#modalOverlay {{
display: none;
position: fixed; inset: 0;
background: rgba(0,0,0,.45); z-index: 999;
}}
#editModal {{
display: none; position: fixed;
top: 50%; left: 50%; transform: translate(-50%, -50%);
width: 320px; background: #fff; border-radius: 10px;
box-shadow: 0 10px 30px rgba(0,0,0,.25); z-index: 1000; padding: 16px;
}}
#editModal h3 {{ margin: 0 0 12px 0; }}
#editModal label {{ font-size: 13px; color: #333; }}
#editModal input, #editModal select {{
width: 100%; margin: 6px 0 12px 0; padding: 6px 8px;
font-size: 14px; border: 1px solid #ccc; border-radius: 6px; box-sizing: border-box;
}}
.modal-actions {{ display: flex; gap: 8px; justify-content: flex-end; }}
.muted {{ color: #666; font-size: 12px; }}
.status-badge {{
padding: 2px 8px; border-radius: 12px; font-size: 12px; font-weight: bold;
}}
.status-enabled {{ background: #28a745; color: white; }}
.status-disabled {{ background: #6c757d; color: white; }}
IP 地址管理系统
最后更新:{updated_at}
手动扫描/刷新
每十分钟左右自动扫描一次在线IP,无特殊情况无需手动扫描,通过浏览器刷新页面查看即可。
空闲
在线
离线
保留
[img][/img]
PC
[img][/img]
服务器
[img][/img]
打印机
[img][/img]
网络设备
"""
for net, hosts in sorted(nets.items()):
stats = net_stats[net]
html += f"""
网段 {net}.0/24
{stats['occupied']}/{stats['total_ips']}
占用 {stats['occupied_percent']}%
{stats['online']}/{stats['total_ips']}
在线 {stats['online_percent']}%
"""
for i in range(256):
ip = f"{net}.{i}"
# 默认值(空闲)
mac = "-"
last_seen = "-"
status = "idle"
user = ""
device_type = ""
if i in hosts:
info = hosts
mac = info.get("mac", "-")
last_seen = info.get("last_seen", "-")
status = info.get("status", "idle")
user = info.get("user", "")
device_type = info.get("device_type", "")
# 保留地址禁用编辑
is_reserved = (i == 0 or i == 255)
if is_reserved:
cls = "cell reserved"
else:
# 决定 cell 类
if status == "online":
cls = "cell online"
elif status == "offline":
cls = "cell offline"
else:
cls = "cell"
# 图标(有类型才显示)
icon_html = f"
}.png)
" if device_type else ""
# tooltip 内容
tooltip_html = (
f"IP: {safe(ip)}
"
f"MAC: {safe(mac)}
"
f"最后在线: {safe(last_seen)}
"
f"使用人: {safe(user) if user else '未登记'}
"
f"设备类型: {safe(device_type) if device_type else '未登记'}
"
f"状态: {safe(status)}"
)
onclick_attr = "" if is_reserved else "onclick=\"openModalFromCell(this)\""
html += f"""
{icon_html}
{i}
{tooltip_html}
"""
html += ""
# 弹窗与脚本
html += """
编辑 IP 信息
使用人
设备类型
未登记
PC
Server
Net
状态
在线
离线
空闲
取消
保存
"""
# 也写一份 index.html 到本地,便于离线打开
try:
with open("index.html", "w", encoding="utf-8") as f:
f.write(html)
except Exception as _:
pass
return html
# ----------------- HTTP 服务 -----------------
class Handler(SimpleHTTPRequestHandler):
def do_GET(self):
"""
- "/" 或 "/index.html":动态生成页面
- "/refresh":触发扫描,返回 JSON
- 其他路径:静态资源交由父类处理(用于 .png 图标等)
"""
# 首页:动态生成 HTML
if self.path == "/" or self.path.startswith("/index.html"):
data = load_data()
html = generate_html(data)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(html.encode("utf-8"))
return
# 手动扫描:刷新数据并返回 JSON
if self.path == "/refresh":
try:
perform_scan()
resp = {"ok": True, "message": "扫描完成"}
except Exception as e:
resp = {"ok": False, "error": f"扫描失败: {str(e)}"}
js = json.dumps(resp, ensure_ascii=False).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.end_headers()
self.wfile.write(js)
return
# 其他静态资源(如 .png 图标)使用父类默认处理
return super().do_GET()
def do_POST(self):
"""保存编辑:使用人 / 设备类型 / 状态。"""
if self.path == "/update":
length = int(self.headers.get("Content-Length", "0") or 0)
body = self.rfile.read(length).decode("utf-8")
params = parse_qs(body)
ip = (params.get("ip", [""])[0] or "").strip()
user = (params.get("user", [""])[0] or "").strip()
device_type = (params.get("device_type", [""])[0] or "").strip()
status = (params.get("status", [""])[0] or "").strip() or "idle"
try:
ipaddress.IPv4Address(ip)
except Exception:
return self._json({"ok": False, "error": "IP 地址不合法"})
data = load_data()
if ip not in data:
# 新增记录(手工登记)
data[ip] = {
"mac": data.get(ip, {}).get("mac", "-"),
"last_seen": data.get(ip, {}).get("last_seen", "-"),
"status": status,
"user": user,
"device_type": device_type
}
else:
# 修改仅影响可手工项 + 状态;不动 mac/last_seen(除非本来不存在)
data[ip]["user"] = user
data[ip]["device_type"] = device_type
data[ip]["status"] = status
data[ip]["mac"] = data[ip].get("mac", "-")
data[ip]["last_seen"] = data[ip].get("last_seen", "-")
save_data(data)
record = data[ip]
return self._json({"ok": True, "record": {
"ip": ip,
"user": record.get("user", ""),
"device_type": record.get("device_type", ""),
"status": record.get("status", "idle"),
"mac": record.get("mac", "-"),
"last_seen": record.get("last_seen", "-"),
}})
# 未知 POST
self.send_response(404)
self.end_headers()
# 小工具:返回 JSON
def _json(self, obj, code=200):
js = json.dumps(obj, ensure_ascii=False).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.end_headers()
self.wfile.write(js)
def run():
"""启动服务;根据配置决定是否自动扫描。"""
if AUTO_SCAN:
# 启动自动扫描线程
scanner_thread = threading.Thread(target=auto_scan_loop, daemon=True)
scanner_thread.start()
"""
# 执行一次初始扫描
try:
perform_scan()
except Exception as e:
print(f"[run] 初始扫描失败: {e}")
"""
# 启动HTTP服务器
httpd = HTTPServer((HOST, PORT), Handler)
print(f"服务已启动:http://{HOST}:{PORT}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()
print("服务已关闭。")
if __name__ == "__main__":
run()