1.在iplist.csv中维护好IP、port、所属系统、站点描述、系统站点等信息;
2.检测结果输出到iplist_out.csv中。
以下是源代码部分
import socket
import csv
import time
socket.setdefaulttimeout(3)
class NetworkCheck():
def __init__(self,ipfile,outfile):
self.ipfile = ipfile
self.outfile = outfile
#self.timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')
#------IP端口检测-----
def socketTest(self,ip,port):
#res = ''
port = int(port)
timestr = time.strftime('%Y-%m-%d %H:%M:%S %p')
#print('%s - CONNECTING %s:%i' % (timestr, ip, port))
#print('%s - CONNECTING %s:%i \n' % (timestr, ip, port))
if port > 65536:
print('--Error: Port should less than 65535')
res = 'Error'
else:
try:
for num in range(3): # 循环检测3次
s = socket.socket()
result = s.connect_ex((ip, port))
if result == 0:
print('{}:{}--OK'.format(ip,port))
res = 'OK'
break # 如果检测成功,则跳出循环
else:
print('{}:{}--FAIL'.format(ip,port))
res = 'FAIL'
continue # 如果检测不通则间隔10秒,继续检测
except Exception as e:
print('异常未执行:{}'.format(e))
res = '异常未执行'
finally:
s.close()
return res
#----------执行函数------------------
def main(self):
csvfile = csv.reader(open(self.ipfile, 'r'))
ipInput = []
ipOut = []
# READ FROM INPUT CSV FILE
for row in csvfile:
if row[0].lower() == 'ip':
continue
else:
ipInput.append((row[0], row[1], row[2], row[3], row[4]))
# TEST CONNECTION
for ip, port, soft, detial, node in ipInput:
timestr = time.strftime('%Y-%m-%d %H:%M:%S %p') # 当前时间
res = self.socketTest(ip, port)
#print(res)
if res == 'FAIL':
ipOut.append([soft, node, detial, ip, port, res, timestr])
# WRITE TO OUTPUT CSV FILE
out = open(self.outfile, 'w', newline='')
csv_write = csv.writer(out, dialect='excel')
csv_write.writerow(['所属系统', '系统站点', '站点描述', 'ip', 'port', '测试结果', '测试时间'])
for item in ipOut:
csv_write.writerow(item)
out.close()
print("OUTPUT FILE: %s\n" % self.outfile)
#------主函数-----
if __name__ == '__main__':
ipfile = 'iplist.csv'
outfile = 'iplist_out.csv'
start_time = time.time()
so = NetworkCheck(ipfile,outfile)
re = so.main()