网络状态监控程序(Windows系统)

查看 11|回复 0
作者:hdxzd12   
import time
import socket
import threading
import psutil
import os
import winsound
class NetworkMonitor:
    def __init__(self):
        self.alert_sound = r"C:\Users\666\voice\alert.wav"
        self.connected_sound = r"C:\Users\666\voice\网络连接成功.wav"
        self.no_internet_sound = r"C:\Users\666\voice\网络连接失败.wav"
        self.disconnected_sound = r"C:\Users\666\voice\网络连接已断开.wav"
        
        self.previous_state = None
        self.check_interval = 5  # 检查间隔(秒)
        self.is_monitoring = False
        
    def play_sound(self, sound_file):
        """在单独的线程中播放声音"""
        def play():
            try:
                # 使用 winsound 播放 WAV 文件
                if os.path.exists(sound_file):
                    winsound.PlaySound(sound_file, winsound.SND_FILENAME)
                    print(f"播放声音: {os.path.basename(sound_file)}")
                else:
                    print(f"声音文件不存在: {sound_file}")
            except Exception as e:
                print(f"播放声音时出错: {e}")
        
        thread = threading.Thread(target=play)
        thread.daemon = True
        thread.start()
   
    def check_network_status(self):
        """检查网络连接状态"""
        # 检查本地网络连接
        local_connected = self.check_local_connection()
        
        if not local_connected:
            return "disconnected"
        
        # 检查互联网连接
        internet_connected = self.check_internet_connection()
        
        if internet_connected:
            return "connected"
        else:
            return "no_internet"
   
    def check_local_connection(self):
        """检查本地网络连接"""
        try:
            # 获取网络接口状态
            interfaces = psutil.net_if_stats()
            for interface_name, interface_stats in interfaces.items():
                # 跳过虚拟接口和未启用的接口
                if interface_name.startswith('Loopback') or interface_name.startswith('isatap'):
                    continue
                if interface_stats.isup:
                    # 获取接口地址信息
                    addrs = psutil.net_if_addrs().get(interface_name, [])
                    for addr in addrs:
                        # 检查是否有IPv4地址且不是本地回环
                        if addr.family == socket.AF_INET and not addr.address.startswith('127.'):
                            return True
            return False
        except Exception:
            return False
   
    def check_internet_connection(self, timeout=5):
        """检查互联网连接"""
        # 尝试连接多个可靠的服务来检测互联网连接
        test_hosts = [
            ("8.8.8.8", 53),  # Google DNS
            ("1.1.1.1", 53),  # Cloudflare DNS
            ("208.67.222.222", 53),  # OpenDNS
        ]
        
        for host, port in test_hosts:
            try:
                socket.setdefaulttimeout(timeout)
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                result = sock.connect_ex((host, port))
                sock.close()
                if result == 0:
                    return True
            except Exception:
                continue
        
        # 如果所有主机都连接失败,尝试HTTP连接
        try:
            import urllib.request
            urllib.request.urlopen('http://www.google.com/generate_204', timeout=timeout)
            return True
        except Exception:
            return False
   
    def handle_state_change(self, new_state):
        """处理状态变化"""
        print(f"网络状态变化: {self.previous_state} -> {new_state}")
        
        # 播放警报声
        self.play_sound(self.alert_sound)
        time.sleep(1)  # 等待警报声播放完毕
        
        # 根据新状态播放相应的语音
        if new_state == "connected":
            self.play_sound(self.connected_sound)
            print("网络连接成功")
        elif new_state == "no_internet":
            self.play_sound(self.no_internet_sound)
            print("网络连接失败 - 无法连接到互联网")
        elif new_state == "disconnected":
            self.play_sound(self.disconnected_sound)
            print("网络连接已断开")
   
    def start_monitoring(self):
        """开始监控"""
        self.is_monitoring = True
        print("开始监控网络连接...")
        print(f"检查间隔: {self.check_interval}秒")
        
        # 先检查初始状态
        initial_state = self.check_network_status()
        print(f"初始网络状态: {initial_state}")
        self.previous_state = initial_state
        
        try:
            while self.is_monitoring:
                current_state = self.check_network_status()
               
                # 如果状态发生变化
                if current_state != self.previous_state:
                    self.handle_state_change(current_state)
                    self.previous_state = current_state
               
                time.sleep(self.check_interval)
               
        except KeyboardInterrupt:
            print("\n监控已停止")
        except Exception as e:
            print(f"监控过程中出现错误: {e}")
   
    def stop_monitoring(self):
        """停止监控"""
        self.is_monitoring = False
def main():
    # 检查必要的依赖
    try:
        import psutil
    except ImportError as e:
        print(f"缺少必要的依赖库: {e}")
        print("请安装所需的库:")
        print("pip install psutil")
        return
   
    # 检查声音文件是否存在
    monitor = NetworkMonitor()
    sound_files = [
        monitor.alert_sound,
        monitor.connected_sound,
        monitor.no_internet_sound,
        monitor.disconnected_sound
    ]
   
    for sound_file in sound_files:
        if not os.path.exists(sound_file):
            print(f"警告: 声音文件不存在 - {sound_file}")
   
    print("网络连接监控脚本")
    print("按 Ctrl+C 停止监控")
   
    try:
        monitor.start_monitoring()
    except KeyboardInterrupt:
        print("\n监控已由用户停止")
if __name__ == "__main__":
    main()

网络, 状态

您需要登录后才可以回帖 登录 | 立即注册

返回顶部