其中的端口扫描程序用的是shadow1ng/fscan: 一款内网综合扫描工具,方便一键自动化、全方位漏扫扫描。 (github.com)
由于不是自己的扫描程序,使用总是有不方便的地方。 于是决定用最近正在学习的Python写了一段端口扫描程序。
[Python] 纯文本查看 复制代码import socket
import concurrent.futures
import ipaddress
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义要扫描的端口范围
port_range = [135]
# 定义线程池大小
thread_pool_size = 200
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
print(ips)
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in ips for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
if __name__ == "__main__":
scan_subnet(subnet)
我们内网网段有4096个地址,实际使用不超过10%,上面代码却要扫描整个网段,感觉比较浪费资源,又做了下面这个,先用Ping做存活检测,然后对存活终端做端口扫描。
[Python] 纯文本查看 复制代码import subprocess
import os
import sys
import re
import concurrent.futures
import ipaddress
import socket
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135]
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
def PingIP(ip):
try:
p = subprocess.Popen(['ping','-n','1','-w','20',ip],
stdout=subprocess.PIPE,
stdin = subprocess.PIPE,
stderr = subprocess.PIPE,
shell = True)
output = p.stdout.read().decode("gbk").upper()
if "TTL" in output:
return(ip)
else:
pass
except:
pass
def checkLive(subnet):
ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
iplist=[]
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(PingIP, ip) for ip in ips]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
ip = future.result()
if ip is not None:
iplist.append(ip)
print(iplist)
scan_subnet(iplist)
if __name__ == "__main__":
checkLive(subnets)
但这程序运行后CPU直接拉满,检测速度也比不做活检慢了好几倍。于是想到用ARP做活检。
[Python] 纯文本查看 复制代码import os
import sys
import time
from scapy.all import ARP, Ether, srp
import concurrent.futures
import socket
# 定义要扫描的网段
subnet = "192.168.118.0/24"
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
# 定义线程池大小
thread_pool_size = 200
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
def arpscan(subnet):
arp_request = ARP(pdst=subnet)
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = ether / arp_request
answered_list = srp(arp_request_broadcast, timeout=1, verbose=False)[0]
clients = []
for packet in answered_list:
ip = packet[1].psrc
clients.append(ip)
scan_subnet(clients)
if __name__ == "__main__":
arpscan(subnet)
v
ARP做本网段存活检测确实快,但研究半天才发现ARP不支持跨网段,活检还得想办法用Ping,直到找到了https://blog.csdn.net/Small_Teenager/article/details/122123299
[Python] 纯文本查看 复制代码# encoding:utf-8
import time
import struct
import socket
import select
import concurrent.futures
import ipaddress
#Ping程序代码来自https://blog.csdn.net/Small_Teenager/article/details/122123299
# 定义要扫描的网段
subnet = "192.168.112.0/20"
# 定义线程池大小
thread_pool_size = 200
# 定义要扫描的端口范围
port_range = [135,445,3306,3389,6379,22]
def scan_port(ip, port):
#print("正在扫描" + str(ip) + "的端口:" + str(port) +"\n")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
if result == 0:
return str(ip) + ":" + str(port)
except:
pass
finally:
if sock:
sock.close()
def scan_subnet(subnet):
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
futures = [executor.submit(scan_port, ip, port) for ip in subnet for port in port_range]
concurrent.futures.wait(futures)
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port is not None:
print(port)
def chesksum(data):
n = len(data)
m = n % 2
sum = 0
for i in range(0, n - m ,2):
sum += (data) + ((data[i+1]) > 16) + (sum & 0xffff)
sum += (sum >> 16) #如果还有高于16位,将继续与低16位相加
answer = ~sum & 0xffff
# 主机字节序转网络字节序列(参考小端序转大端序)
answer = answer >> 8 | (answer BBHHH32s',data_type,data_code,data_checksum,data_ID,data_Sequence,payload_body)
icmp_chesksum = chesksum(icmp_packet) #获取校验和
# 把校验和传入,再次打包
icmp_packet = struct.pack('>BBHHH32s',data_type,data_code,icmp_chesksum,data_ID,data_Sequence,payload_body)
return icmp_packet
def raw_socket(dst_addr,icmp_packet):
'''
连接套接字,并将数据发送到套接字
'''
#实例化一个socket对象,ipv4,原套接字,分配协议端口
rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp"))
#记录当前请求时间
send_request_ping_time = time.time()
#发送数据到网络
rawsocket.sendto(icmp_packet,(dst_addr,80))
#返回数据
return send_request_ping_time,rawsocket,dst_addr
def reply_ping(send_request_ping_time,rawsocket,data_Sequence,timeout = 2):
while True:
#开始时间
started_select = time.time()
#实例化select对象,可读rawsocket,可写为空,可执行为空,超时时间
what_ready = select.select([rawsocket], [], [], timeout)
#等待时间
wait_for_time = (time.time() - started_select)
#没有返回可读的内容,判断超时
if what_ready[0] == []: # Timeout
return -1
#记录接收时间
time_received = time.time()
#设置接收的包的字节为1024
received_packet, addr = rawsocket.recvfrom(1024)
#获取接收包的icmp头
#print(icmpHeader)
icmpHeader = received_packet[20:28]
#反转编码
type, code, checksum, packet_id, sequence = struct.unpack(
">BBHHH", icmpHeader
)
if type == 0 and sequence == data_Sequence:
return time_received - send_request_ping_time
#数据包的超时时间判断
timeout = timeout - wait_for_time
if timeout 0:
#print("来自 {0} 的回复: 字节=32 时间={1}ms".format(addr,int(times*1000)))
return host
else:
#print("请求超时。")
pass
def StartPing(subnet):
# 将网段转换为IP地址列表
ips = [str(ip) for ip in list(ipaddress.IPv4Network(subnet).hosts())]
print(ips)
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_pool_size) as executor:
# 对于每个IP地址和端口,提交扫描任务到线程池
futures = [executor.submit(ping, ip) for ip in ips]
# 等待所有扫描任务完成
concurrent.futures.wait(futures)
# 打印开放的端口号
iplist = []
for future in concurrent.futures.as_completed(futures):
ip = future.result()
if ip is not None:
iplist.append(ip)
scan_subnet(iplist)
if __name__ == "__main__":
StartPing(subnet)
利用Python实现Ping后,资源占用明显降低,扫描一个端口不如不做活检直接扫描,但扫描多个端口速度优势就体现出来了。