import datetime
import snap7
import requests
import json
from snap7 import util
# 初始化变量
value7 = None
start_time = None
end_time = None
while True:
try:
time.sleep(10)
# 创建一个客户端对象
client = snap7.client.Client()
#设置链接类型
client.set_connection_type(1)
# 连接到PLC
client.connect('10.112.14.111', 0, 4,102)
# 读取PLC数据
# 读DB182.DBD30生产数量
data = client.read_area(snap7.types.Areas.DB, 182, 0, 34)
value2 = util.get_dword(data, 30)
# print(f"当前生产数量为: {value2}")
# 读DB182.DBW0,模具号
data = client.read_area(snap7.types.Areas.DB, 182, 0, 2)
value1 = util.get_word(data, 0)
# print(f"模具号的值为: {value1}")
# 读DB201.DBX2.1
data = client.read_area(snap7.types.Areas.DB,201,2,1)
new_value7 = util.get_bool(data,0,1)
print(f"整线运行状态: {new_value7},时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 检查value7从False变为True的情况
if new_value7 is False and value7 is True:
start_time = datetime.datetime.now()
# 检查value7从True变为False的情况
elif new_value7 is True and value7 is False and start_time is not None:
end_time = datetime.datetime.now()
time_difference = end_time - start_time
total_seconds = time_difference.total_seconds()
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
print(f"时间差为 {hours} 小时 {minutes} 分钟 {str(seconds)[0:4} 秒")
webhook_url = '自己的钉钉机器人API'
markdown_text =(
### Hello
f'**当前模具号**:{value1}\n\n'
f'**生产数量为**:{value2}\n\n'
f'**当前时间为**:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}\n\n'
f'**停台时间**:{hours}小时{minutes}分钟{str(seconds)[0:4}秒')
# 构建POST数据
data = {
"msgtype": "markdown",
"markdown": {
"title": "Hello", # 可选,消息标题
"text": markdown_text,
},
}
response = requests.post(webhook_url, headers={'Content-Type': 'application/json'}, data=json.dumps(data))
# 更新当前的value7值
value7 = new_value7
except Exception as e:
print(f"发生错误: {e}. 尝试重新开始...")
continue # 遇到任何异常时,跳过本次循环,直接进行下一次循环重新开始
finally:
# 确保每次循环结束后都尝试断开连接,防止资源泄露
try:
client = snap7.client.Client()
client.disconnect()
client.destroy()
except (AttributeError):
pass