在Linux下python代码操作短信模块

查看 106|回复 9
作者:chenmi2017   
程序运行流程:
1.开始:爬虫判断IP是否变法
           1.如果变法调用to_at_msg函数,进入发送短信流程。
        2.如果没有变法直接跳过。
2.配置文件存在?:检查是否存在配置文件。
       1.如果存在,读取配置文件。
        2.如果不存在,进入配置COM端口流程。
3.配置COM端口:调用 configure_com_port 函数让用户选择端口。
       1.如果配置成功,进入测试波特率流程。
       2.如果配置失败,返回错误信息。
4.测试波特率:遍历波特率列表,测试每个波特率。
       1.如果测试成功,保存配置文件。
       2.如果测试失败,返回错误信息。
5.读取配置文件:从配置文件中读取端口和波特率信息。
6.替换中文标点:调用 replace_chinese_punctuation 函数替换中文标点符号。
7.初始化短信模块:调用 serial_msg_init 函数初始化短信模块。
       1.如果初始化成功,发送短信。
       2.如果初始化失败,返回错误信息。
8.发送短信:发送短信并返回响应信息。
9.返回响应信息:返回最终结果给调用者。源码:
[Python] 纯文本查看 复制代码
import serial.tools.list_ports
import serial
import time
# from s_path import dtu_at_config_path
from os import path
import json
def list_com_ports():
    """
    列出所有可用的串行端口(COM端口)。
    该函数通过调用serial.tools.list_ports.comports()来获取系统中所有可用的串行端口列表。
    如果没有找到任何端口,返回False和一个描述性字符串。
    如果找到一个或多个端口,返回True和端口列表。
    Returns:
        tuple: 包含一个布尔值和一个消息或端口列表的元组。
            - 如果没有找到端口,返回(False, "没有找到任何可用端口")。
            - 如果找到端口,返回(True, ports)。
    """
    # 获取所有可用的串行端口列表
    ports = list(serial.tools.list_ports.comports())
    # 检查是否找到任何端口
    if len(ports) == 0:
        # 没有找到端口,返回False和一个描述性字符串
        return False, "没有找到任何可用端口"
    else:
        # 找到一个或多个端口,返回True和端口列表
        return True, ports
# 配置COM端口函数
def configure_com_port():
    """
    此函数用于配置和选择COM端口。它首先列出所有可用的COM端口,然后让用户选择一个端口。
    如果用户输入无效或在指定次数内未选择有效的端口,函数将返回失败。
    Returns:
        成功时返回选定的COM端口,失败时返回包含错误信息的列表。
    """
    # 获取所有可用的COM端口
    com_ports = list_com_ports()
    # 检查返回值是否是True
    if not com_ports[0]:
        # 如果获取COM端口失败,直接返回错误信息
        return com_ports
    com_ports = com_ports[1]
    # 初始化计数器,用于限制用户选择端口的尝试次数
    counter = 0
    while counter  3:
                    response = ser.read(1024)
                    if len(response) == 0:
                        break
                time.sleep(1)
                # 读取串口返回的数据
                response = ser.read(1024)
                # 将返回的数据解码为字符串,忽略无法解码的字符
                response_decoded = response.decode('GBK', errors='ignore')
                # 将解码后的字符串添加到响应字符串中
                response_str += response_decoded
        # 如果所有命令都成功发送和接收,则返回 True 和所有响应数据
        return True, response_str
    except Exception as e:
        # 如果发生异常,则返回 False 和异常信息
        return False, str(e)
def serial_msg_init(com, bps, phone, msg, timeout=1):
    """
    初始化短信模块,并发送短信。
    Args:
        com (str): 串口名称,如 'COM3'。
        bps (int): 波特率,如 9600。
        phone (str): 接收短信的手机号码。
        msg (str): 要发送的短信内容。
        timeout (float, optional): 串口读写超时时间,默认为 1 秒。
    Returns:
        tuple: 包含两个元素的元组,第一个元素为布尔值,表示操作是否成功;
              第二个元素为字符串,包含所有响应数据或错误信息。
    Raises:
        无
    """
    # 初始化用于存储所有响应的字符串
    response_str = ''
    try:
        # 打开串口并设置参数
        with serial.Serial(com, bps, timeout=timeout) as ser:
            # 添加换行符并发送命令
            ser.write(b'\x1a')
            # 初始化模块配置,设置短信服务,设置短信TEXT模式、设置字符集UCS2设置为中文模式、保存配置.
            commands = ['AT', 'AT+CSMS=1','AT+CMGF=1', 'AT+CSMP=17,167,0,8', 'AT&W']
            # 遍历所有命令
            for command in commands:
                # 添加换行符并发送命令
                command = command + '\r'
                ser.write(command.encode())
                # 等待一段时间确保命令发送完成
                time.sleep(0.1)
                # 读取串口返回的数据
                response = ser.read(1024)
                # 将返回的数据解码为字符串,忽略无法解码的字符
                response_decoded = response.decode('GBK', errors='ignore')
                if 'OK' in response_decoded:
                    # 将解码后的字符串添加到响应字符串中
                    response_str += response_decoded
                else:
                    # 如果没有找到"OK",则将整个返回数据添加到响应字符串中
                    response_str += response_decoded
                    return False,response_str
            # 等待一段时间确保命令发送完成
            time.sleep(0.1)
            # 读取串口返回的数据
            response = ser.read(1024)
            # 将返回的数据解码为字符串,忽略无法解码的字符
            response_decoded = response.decode('GBK', errors='ignore')
            # 将解码后的字符串添加到响应字符串中
            response_str += response_decoded
            time.sleep(0.1)
            if 'OK' in response_str:
                ser.write(f'AT+CMGS=\"{phone}\"\r'.encode())
                time.sleep(0.1)
                # 发送 utf-16-be编码的短信
                ser.write(msg.encode('utf-16'))
                # 添加换行符并发送命令
                ser.write(b'\x1a')
                for i in range(20):
                    if i > 3:
                        response = ser.read(1024)
                        if len(response) == 0:
                            break
                    time.sleep(1)
                    # 读取串口返回的数据
                    response = ser.read(1024)
                    # 将返回的数据解码为字符串,忽略无法解码的字符
                    response_decoded = response.decode('GBK', errors='ignore')
                    # 将解码后的字符串添加到响应字符串中
                    response_str += response_decoded
            else:
                return False, f'链接短信模块失败{response_str}'
        # 如果所有命令都成功发送和接收,则返回 True 和所有响应数据
        return True, response_str
    except Exception as e:
        # 如果发生异常,则返回 False 和异常信息
        return False, str(e)
def save_at_config():
    """
    保存4G-DTU的AT配置信息。
    如果配置文件不存在,则进行端口和波特率的配置测试,直到找到合适的配置。
    测试通过后,将配置信息保存到文件中。
    Returns:
        tuple: (bool, str) 表示配置是否成功以及相应的消息。
    """
    if not path.exists('dtu_at_config.json'):
        com = configure_com_port()
        bps_list = [9600, 19200, 38400, 57600, 115200]
        print(f"测试端口号为- {com} -")
        for bps in bps_list:
            try:
                print(f"测试波特率为- {bps} -.")
                commands = ['AT', 'AT+CSMS=1','AT+CMGF=1', 'AT+CSMP=17,167,0,8', 'AT&W']
                respomse = serial_init(com, bps)
                if respomse[0]:
                    if 'OK' in respomse[1]:
                        print(f"端口号为- {com} -,波特率为- {bps} -测试成功.")
                        print(f'初始化模块配置,设置短信TEXT模式、设置字符集UCS2、设置为中文模式、保存配置.')
                        respomse = serial_init(com, bps, commands)
                        if 'OK' in respomse[1]:
                            print(f'{respomse[1]}\n初始化配置成功.')
                        with open('dtu_at_config.json', 'w', encoding='utf-8') as f:
                            json.dump({'com': com, 'bps': bps},
                                      f, ensure_ascii=False)
                        return True, '保存4G-DTU-AT配置成功'
                else:
                    print(f"波特率测试失败.")
                    return False, "波特率测试失败."
            except Exception as e:
                print(f'保存4G-DTU-AT配置失败,错误信息:{e}')   
                return False, f'保存4G-DTU-AT配置失败,错误信息:{e}'
    else:
        return True, '4G-DTU-AT配置文件已存在.'
def replace_chinese_punctuation(text):
    # 定义中文标点和对应的英文标点的映射
    punctuation_mapping = {
        ',': ',',
        '。': '.',
        '!': '!',
        '?': '?',
        '“': '"',
        '”': '"',
        '‘': "'",
        '’': "'",
        ':': ':',
        ';': ';',
        '(': '(',
        ')': ')',
        '【': '[',
        '】': ']',
        '《': '',
        '—': '-',
        '——': '--',
        '…': '...',
        '、': ',',
        '·': '.',
        '~': '~'
    }
    # 使用for循环调用replace()进行替换
    for chinese_punc, english_punc in punctuation_mapping.items():
        text = text.replace(chinese_punc, english_punc)
    return text
def to_at_msg(phone=None, msg=None):
    """
    发送AT命令消息。
    本函数通过读取配置文件来初始化串口通信,以便发送AT命令。
    它会尝试通过串口发送一条消息给指定的电话号码。
    参数:
    - phone (str): 接收消息的电话号码。默认为None。
    - msg (str): 要发送的消息内容。默认为None。
    返回:
    - 成功时返回服务器的响应信息。
    - 失败时返回一个包含错误信息的字符串。
    """
    if save_at_config()[0]:
        with open('dtu_at_config.json', 'r', encoding='utf-8') as f:
            config = json.load(f)
            com = config['com']
            bps = config['bps']
            msg=replace_chinese_punctuation(msg)
        try:
            # response=serial_init(com,bps,commands=['AT',f'AT+CMGS="{phone}"',msg,'b"\x1A"'],timeout=1)
            response = serial_msg_init(com, bps, phone, msg, timeout=1)
            if response[0]:
                return response[1]
            else:
                raise Exception(response[1])
        except Exception as e:
            return f'AT命令执行失败,错误信息:{e}'
if __name__ == '__main__':   
    import requests
    # 获取当前的 IPv4 地址
    ipv4 = requests.get('https://ipv4.icanhazip.com').text.strip()
    try:
        with open('oldip.txt', 'r', encoding='utf-8') as file:
            oldipv4 = file.read()
        if oldipv4!=ipv4:
            print(f'地址发送变法,最新地址:{ipv4},旧地址:{oldipv4}')
            with open('oldip.txt', 'w', encoding='utf-8') as file:
                file.write(ipv4)
                to_at_msg(19973xxxx, f'地址:{ipv4}')
        else:
            print('IP信息无变法,跳过更新')
    except FileNotFoundError:
        to_at_msg(19973xxxx, f'地址:{ipv4}')
        with open('oldip.txt', 'w', encoding='utf-8') as file:
            file.write(ipv4)
            print("文件不存在,创建空文件。")
创建python虚拟环境
[Bash shell] 纯文本查看 复制代码cd /home/chenmi/py_dm/dtu_4g/
python3 -m venv .venv
Linux运行脚本:切换到程序目录,使用python虚拟环境运行[Bash shell] 纯文本查看 复制代码
#!/bin/bash
cd /home/chenmi/py_dm/dtu_4g/
/home/chenmi/py_dm/dtu_4g/.dtu_venv/bin/python get_ip_dtu_4g.py >> /home/chenmi/py_dm/dtu_4g/output.log 2>&1
定时任务配置:每小时的第三分钟运行
3 * * * * /home/chenmi/py_dm/dtu_4g/start_get_ip_dtu_4g.sh
操作串口需要root权限

端口, 字符串

wangxuebiao   

好用好用好用
kongson   

感谢楼主分享!
Elske   

感谢楼主分享!
lypxynok   

串口命令不熟,先收藏着,感谢楼主
ss63551234ss   

感谢楼主分享!
Seagant   

感谢分享,跟楼主学习学习~
qzcwuaipojie   

这个短信模块其实还是比较常见的需求,可以采用试试,感觉不错
shiyu1   

感谢楼主分享,一直很想学习这一块
xieyinghao   

感谢楼主分享,学习
您需要登录后才可以回帖 登录 | 立即注册

返回顶部