1.开始:爬虫判断IP是否变法
1.如果变法调用to_at_msg函数,进入发送短信流程。
2.如果没有变法直接跳过。
2.配置文件存在?:检查是否存在配置文件。
1.如果存在,读取配置文件。
2.如果不存在,进入配置COM端口流程。
3.配置COM端口:调用 configure_com_port 函数让用户选择端口。
1.如果配置成功,进入测试波特率流程。
2.如果配置失败,返回错误信息。
4.测试波特率:遍历波特率列表,测试每个波特率。
1.如果测试成功,保存配置文件。
2.如果测试失败,返回错误信息。
5.读取配置文件:从配置文件中读取端口和波特率信息。
6.替换中文标点:调用 replace_chinese_punctuation 函数替换中文标点符号。
7.初始化短信模块:调用 serial_msg_init 函数初始化短信模块。
1.如果初始化成功,发送短信。
2.如果初始化失败,返回错误信息。
8.发送短信:发送短信并返回响应信息。
9.返回响应信息:返回最终结果给调用者。源码:
[Python] 纯文本查看 复制代码
import serial.tools.list_ports
import serial
import time
# from s_path import dtu_at_config_path
from os import path
import json
def list_com_ports():
"""
列出所有可用的串行端口(COM端口)。
该函数通过调用serial.tools.list_ports.comports()来获取系统中所有可用的串行端口列表。
如果没有找到任何端口,返回False和一个描述性字符串。
如果找到一个或多个端口,返回True和端口列表。
Returns:
tuple: 包含一个布尔值和一个消息或端口列表的元组。
- 如果没有找到端口,返回(False, "没有找到任何可用端口")。
- 如果找到端口,返回(True, ports)。
"""
# 获取所有可用的串行端口列表
ports = list(serial.tools.list_ports.comports())
# 检查是否找到任何端口
if len(ports) == 0:
# 没有找到端口,返回False和一个描述性字符串
return False, "没有找到任何可用端口"
else:
# 找到一个或多个端口,返回True和端口列表
return True, ports
# 配置COM端口函数
def configure_com_port():
"""
此函数用于配置和选择COM端口。它首先列出所有可用的COM端口,然后让用户选择一个端口。
如果用户输入无效或在指定次数内未选择有效的端口,函数将返回失败。
Returns:
成功时返回选定的COM端口,失败时返回包含错误信息的列表。
"""
# 获取所有可用的COM端口
com_ports = list_com_ports()
# 检查返回值是否是True
if not com_ports[0]:
# 如果获取COM端口失败,直接返回错误信息
return com_ports
com_ports = com_ports[1]
# 初始化计数器,用于限制用户选择端口的尝试次数
counter = 0
while counter 3:
response = ser.read(1024)
if len(response) == 0:
break
time.sleep(1)
# 读取串口返回的数据
response = ser.read(1024)
# 将返回的数据解码为字符串,忽略无法解码的字符
response_decoded = response.decode('GBK', errors='ignore')
# 将解码后的字符串添加到响应字符串中
response_str += response_decoded
# 如果所有命令都成功发送和接收,则返回 True 和所有响应数据
return True, response_str
except Exception as e:
# 如果发生异常,则返回 False 和异常信息
return False, str(e)
def serial_msg_init(com, bps, phone, msg, timeout=1):
"""
初始化短信模块,并发送短信。
Args:
com (str): 串口名称,如 'COM3'。
bps (int): 波特率,如 9600。
phone (str): 接收短信的手机号码。
msg (str): 要发送的短信内容。
timeout (float, optional): 串口读写超时时间,默认为 1 秒。
Returns:
tuple: 包含两个元素的元组,第一个元素为布尔值,表示操作是否成功;
第二个元素为字符串,包含所有响应数据或错误信息。
Raises:
无
"""
# 初始化用于存储所有响应的字符串
response_str = ''
try:
# 打开串口并设置参数
with serial.Serial(com, bps, timeout=timeout) as ser:
# 添加换行符并发送命令
ser.write(b'\x1a')
# 初始化模块配置,设置短信服务,设置短信TEXT模式、设置字符集UCS2设置为中文模式、保存配置.
commands = ['AT', 'AT+CSMS=1','AT+CMGF=1', 'AT+CSMP=17,167,0,8', 'AT&W']
# 遍历所有命令
for command in commands:
# 添加换行符并发送命令
command = command + '\r'
ser.write(command.encode())
# 等待一段时间确保命令发送完成
time.sleep(0.1)
# 读取串口返回的数据
response = ser.read(1024)
# 将返回的数据解码为字符串,忽略无法解码的字符
response_decoded = response.decode('GBK', errors='ignore')
if 'OK' in response_decoded:
# 将解码后的字符串添加到响应字符串中
response_str += response_decoded
else:
# 如果没有找到"OK",则将整个返回数据添加到响应字符串中
response_str += response_decoded
return False,response_str
# 等待一段时间确保命令发送完成
time.sleep(0.1)
# 读取串口返回的数据
response = ser.read(1024)
# 将返回的数据解码为字符串,忽略无法解码的字符
response_decoded = response.decode('GBK', errors='ignore')
# 将解码后的字符串添加到响应字符串中
response_str += response_decoded
time.sleep(0.1)
if 'OK' in response_str:
ser.write(f'AT+CMGS=\"{phone}\"\r'.encode())
time.sleep(0.1)
# 发送 utf-16-be编码的短信
ser.write(msg.encode('utf-16'))
# 添加换行符并发送命令
ser.write(b'\x1a')
for i in range(20):
if i > 3:
response = ser.read(1024)
if len(response) == 0:
break
time.sleep(1)
# 读取串口返回的数据
response = ser.read(1024)
# 将返回的数据解码为字符串,忽略无法解码的字符
response_decoded = response.decode('GBK', errors='ignore')
# 将解码后的字符串添加到响应字符串中
response_str += response_decoded
else:
return False, f'链接短信模块失败{response_str}'
# 如果所有命令都成功发送和接收,则返回 True 和所有响应数据
return True, response_str
except Exception as e:
# 如果发生异常,则返回 False 和异常信息
return False, str(e)
def save_at_config():
"""
保存4G-DTU的AT配置信息。
如果配置文件不存在,则进行端口和波特率的配置测试,直到找到合适的配置。
测试通过后,将配置信息保存到文件中。
Returns:
tuple: (bool, str) 表示配置是否成功以及相应的消息。
"""
if not path.exists('dtu_at_config.json'):
com = configure_com_port()
bps_list = [9600, 19200, 38400, 57600, 115200]
print(f"测试端口号为- {com} -")
for bps in bps_list:
try:
print(f"测试波特率为- {bps} -.")
commands = ['AT', 'AT+CSMS=1','AT+CMGF=1', 'AT+CSMP=17,167,0,8', 'AT&W']
respomse = serial_init(com, bps)
if respomse[0]:
if 'OK' in respomse[1]:
print(f"端口号为- {com} -,波特率为- {bps} -测试成功.")
print(f'初始化模块配置,设置短信TEXT模式、设置字符集UCS2、设置为中文模式、保存配置.')
respomse = serial_init(com, bps, commands)
if 'OK' in respomse[1]:
print(f'{respomse[1]}\n初始化配置成功.')
with open('dtu_at_config.json', 'w', encoding='utf-8') as f:
json.dump({'com': com, 'bps': bps},
f, ensure_ascii=False)
return True, '保存4G-DTU-AT配置成功'
else:
print(f"波特率测试失败.")
return False, "波特率测试失败."
except Exception as e:
print(f'保存4G-DTU-AT配置失败,错误信息:{e}')
return False, f'保存4G-DTU-AT配置失败,错误信息:{e}'
else:
return True, '4G-DTU-AT配置文件已存在.'
def replace_chinese_punctuation(text):
# 定义中文标点和对应的英文标点的映射
punctuation_mapping = {
',': ',',
'。': '.',
'!': '!',
'?': '?',
'“': '"',
'”': '"',
'‘': "'",
'’': "'",
':': ':',
';': ';',
'(': '(',
')': ')',
'【': '[',
'】': ']',
'《': '',
'—': '-',
'——': '--',
'…': '...',
'、': ',',
'·': '.',
'~': '~'
}
# 使用for循环调用replace()进行替换
for chinese_punc, english_punc in punctuation_mapping.items():
text = text.replace(chinese_punc, english_punc)
return text
def to_at_msg(phone=None, msg=None):
"""
发送AT命令消息。
本函数通过读取配置文件来初始化串口通信,以便发送AT命令。
它会尝试通过串口发送一条消息给指定的电话号码。
参数:
- phone (str): 接收消息的电话号码。默认为None。
- msg (str): 要发送的消息内容。默认为None。
返回:
- 成功时返回服务器的响应信息。
- 失败时返回一个包含错误信息的字符串。
"""
if save_at_config()[0]:
with open('dtu_at_config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
com = config['com']
bps = config['bps']
msg=replace_chinese_punctuation(msg)
try:
# response=serial_init(com,bps,commands=['AT',f'AT+CMGS="{phone}"',msg,'b"\x1A"'],timeout=1)
response = serial_msg_init(com, bps, phone, msg, timeout=1)
if response[0]:
return response[1]
else:
raise Exception(response[1])
except Exception as e:
return f'AT命令执行失败,错误信息:{e}'
if __name__ == '__main__':
import requests
# 获取当前的 IPv4 地址
ipv4 = requests.get('https://ipv4.icanhazip.com').text.strip()
try:
with open('oldip.txt', 'r', encoding='utf-8') as file:
oldipv4 = file.read()
if oldipv4!=ipv4:
print(f'地址发送变法,最新地址:{ipv4},旧地址:{oldipv4}')
with open('oldip.txt', 'w', encoding='utf-8') as file:
file.write(ipv4)
to_at_msg(19973xxxx, f'地址:{ipv4}')
else:
print('IP信息无变法,跳过更新')
except FileNotFoundError:
to_at_msg(19973xxxx, f'地址:{ipv4}')
with open('oldip.txt', 'w', encoding='utf-8') as file:
file.write(ipv4)
print("文件不存在,创建空文件。")
创建python虚拟环境
[Bash shell] 纯文本查看 复制代码cd /home/chenmi/py_dm/dtu_4g/
python3 -m venv .venv
Linux运行脚本:切换到程序目录,使用python虚拟环境运行[Bash shell] 纯文本查看 复制代码
#!/bin/bash
cd /home/chenmi/py_dm/dtu_4g/
/home/chenmi/py_dm/dtu_4g/.dtu_venv/bin/python get_ip_dtu_4g.py >> /home/chenmi/py_dm/dtu_4g/output.log 2>&1
定时任务配置:每小时的第三分钟运行
3 * * * * /home/chenmi/py_dm/dtu_4g/start_get_ip_dtu_4g.sh
操作串口需要root权限