注意事项:
在关闭服务端和用户端时可能会出现“正在等待进程分离”这种情况。
192.168.137.1是电脑热点的网关,如果不是请自己找网关ip,实在不行也可以用电脑上网的ip(比如:192.168.2.27)。
用户端可以在电脑或手机上运行,主要以cmd控制电脑,使用命令如:start "" "C:\Program Files (x86)"时代码会卡住,除非你去把用命令打开的窗口关闭。
手机端py运行软件:https://wwad.lanzn.com/iiJav2gn5kna
局域网传文件软件(手机软件):https://wwad.lanzn.com/iDKBF2gn5pfc
接下来是源码和效果图。
服务端:
[Python] 纯文本查看 复制代码import socket
import keyboard
import subprocess
import threading
import logging
import queue
'''
吾爱破解论坛 http://www.52pojie.cn
'''
'''
import win32gui
import win32con
# 获取当前窗口句柄
hwnd = win32gui.GetForegroundWindow()
# 设置窗口属性,隐藏窗口
win32gui.ShowWindow(hwnd, win32con.SW_HIDE)
'''
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
print("客户端登录账号:root,密码:123456,F3关闭服务器")
print()
ip1 = '192.168.137.1' # 默认ip地址
print(f"请输入服务器IP地址(电脑的IPv4地址),不输入默认为 {ip1}")
# 超时设置,15秒
def get_input(prompt, timeout, result_queue):
try:
user_input = input(prompt)
if user_input.strip() == "":
result_queue.put(None) # 如果用户直接回车,返回None
else:
result_queue.put(user_input)
except Exception as e:
logging.error(f"输入过程中发生错误: {e}")
# 创建一个线程来获取输入
print("超时时间---15秒")
print()
result_queue = queue.Queue()
input_thread = threading.Thread(target=get_input, args=("请输入服务器IP地址:", 15, result_queue))
input_thread.start()
# 等待输入,如果超时则返回默认值
input_thread.join(15)
if not result_queue.empty():
ip = result_queue.get()
if ip is None:
ip = ip1 # 如果用户直接回车,使用默认值
else:
ip = ip1
print(f"最终IP地址: {ip}")
'''
def get_ipv4_address():
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
return ip_address
ip=get_ipv4_address()
print(ip)
'''
# 创建一个 TCP/IP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置 SO_REUSEADDR 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定服务器地址和端口
server_address = (ip, 5000)
server_socket.bind(server_address)
# 监听连接
server_socket.listen(5) # 支持最多5个并发连接
print('服务器正在等待连接...')
def close_server():
print('服务器已关闭')
server_socket.close()
exit()
# 监听 F3 键关闭服务器
keyboard.add_hotkey('F3', close_server)
def handle_client(client_socket, client_address):
try:
print('连接来自:', client_address)
# 接收账号和密码
data = client_socket.recv(1024)
logging.debug(f"接收到的数据: {data.decode()}")
# 解析账号和密码
try:
account, password = data.decode().split(':')
except ValueError:
logging.error("账号和密码格式错误")
message = '账号和密码格式错误!'
client_socket.send(message.encode())
return
# 验证账号和密码
if account == 'root' and password == '123456':
# 发送成功消息
message = '登录成功!'
client_socket.send(message.encode())
# 接收客户端请求
while True:
request = client_socket.recv(1024).decode()
logging.debug(f"接收到的请求: {request}")
if request == '1':
# 关机
subprocess.call('shutdown /s /t 0', shell=True)
elif request == '2':
# 重启
subprocess.call('shutdown /r /t 0', shell=True)
elif request == '3':
# 休眠
subprocess.call('rundll32.exe powrprof.dll,SetSuspendState 0,1,0', shell=True)
elif request == '4':
# 锁定
subprocess.call('rundll32.exe user32.dll,LockWorkStation', shell=True)
elif request == '0':
# 退出登录
print(client_address, "退出登录")
break
elif request == '5':
# 等待用户输入
user_input = 1
while True:
# 执行cmd命令
command = client_socket.recv(1024).decode()
logging.debug(f"接收到的命令:{command}")
try:
# 执行命令
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# 获取命令的输出
output, error = result.communicate()
# 打印命令的输出到本地控制台
logging.debug(f"{output}")
# 发送命令的输出
if error:
output = " "
client_socket.sendall(output.encode())
logging.debug(f"{error}")
# 发送命令的错误
client_socket.sendall(error.encode())
logging.debug("命令结果已发送")
except subprocess.CalledProcessError as e:
logging.error(f"{e.stderr}")
# 发送错误信息
client_socket.sendall(str(e).encode())
finally:
# 发送一个结束标记
client_socket.sendall(' '.encode())
# 等待用户输入
#接收客户端的user_input值
user_input = client_socket.recv(1024).decode()
logging.debug(f"接收到的请求: {user_input}")
if user_input == '1':
# 继续执行命令
continue
elif user_input == '2':
print(client_address, "退出登录")
# 退出循环
break
else:
# 无效输入
client_socket.sendall('无效输入'.encode())
# 继续执行命令
continue
else:
# 无效输入
client_socket.sendall('无效输入'.encode())
else:
# 无效请求
message = '无效请求!'
client_socket.send(message.encode())
else:
# 发送失败消息
message = '账号或密码错误!'
client_socket.send(message.encode())
except Exception as e:
logging.error(f"处理客户端连接时发生错误: {e}")
finally:
# 关闭连接
client_socket.close()
while True:
try:
# 等待客户端连接
client_socket, client_address = server_socket.accept()
logging.debug(f"接受到来自 {client_address} 的连接")
# 创建新线程处理客户端连接
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.start()
except OSError as e:
if e.errno == 10038:
print('服务器已关闭')
break
else:
raise
input("按回车键退出...")
用户端:
[Python] 纯文本查看 复制代码import socket
'''
吾爱破解论坛 http://www.52pojie.cn
'''
def run_script():
try:
print()
ip1 = '192.168.137.1'
print("请输入服务器的IP地址,不输入默认ip为", ip1)
print()
ip = input("请输入服务器的IP地址:") or ip1
print()
# 服务器地址和端口
server_address = (ip, 5000)
# 创建一个 TCP/IP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接到服务器
client_socket.connect(server_address)
print('已连接到服务器')
# 获取用户输入的账号和密码
account = input('请输入账号:') or 'root'
password = input('请输入密码:') or '123456'
# 发送账号和密码
message = f'{account}:{password}'
client_socket.sendall(message.encode())
# 接收服务器响应
response = client_socket.recv(1024).decode()
print('服务器响应:', response)
if response == '登录成功!':
print('登录成功')
print()
# 发送默认请求
print("[0]退出登录---[1]关机---[2]重启---[3]休眠")
print("[4]锁定---[5]执行cmd命令")
print()
request = input('输入数字: ') or "5" # 你可以修改这个请求
if request in ['0', '1', '2', '3', '4', '5']:
client_socket.sendall(request.encode())
print("请求已发送")
if request == '0':
print("退出登录成功")
elif request == '5':
while True:
# 获取用户输入的命令
print()
command = input('请输入要执行的命令:') or "echo %date% %time%"
# 发送命令
client_socket.sendall(command.encode())
# 设置超时时间
client_socket.settimeout(60)
try:
print()
print("接收超时时间为60秒")
print()
# 接收命令的输出
data = b''
count = 0
while count
效果图: