Windows和Linux上的包嗅探

操作系统共性

绝大多数操作系统在处理UDP闭合端口时存在共性行为,当发送UDP数据包到主机某个关闭的UDP端口上时,目的主机通常会返回一个ICMP数据包指示目标端口不可达

有趣的是,发送UDP数据包触发ICMP响应,目标主机发送ICMP数据包时,UDP数据包的IP头也包含在ICMP数据中

混杂模式

一台机器的网卡能够接收所有经过它的数据流,而不论其目的地址是否是它,一般计算机网卡都工作在非混杂模式下,此时网卡只接受来自网络端口的目的地址指向自己的数据。当网卡工作在混杂模式下时,网卡将来自接口的所有数据都捕获并交给相应的驱动程序,即不验证MAC地址,Linux开启混杂模式需要root权限,Windows则需要开启IOCTL设置

import socket
import os

# 监听所有网卡
host = '0.0.0.0'

# 创建原始套接字之后绑定在公开接口上,nt为Windows,posix为linux
if os.name == 'nt':
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

# Windows允许嗅探所有协议的所有数据包,但Linux只能嗅探到ICMP数据
# 创建原始套接字RAW Socket
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

# 原始套接字不携带端口号
# sniffer.bind((host, 0))

# 设置在捕捉的数据包中包含IP头
# 开启IP_HDRINCL可以从IP报文首部第一个字节开始依次构造整个IP报文的所有选项,不开启则从IP首部之后的第一个字节开始
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# 在Windows平台上,需设置IOCTL以启用混杂模式,以允许嗅探网卡上经过的所有数据包,即使数据目的地址不是本地
# IOCTL是用户隔离模式下与内核模式组件进行通信的方式
# 发送IOCTL信号到网卡驱动,需要管理员权限
# SIO_RCVALL表示接受所有数据
if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 读取单个数据包,65565为IP报文最大长度
print(sniffer.recvfrom(65565))

# 在Windows平台上,关闭IOCTL混杂模式
if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

解码IP层

import socket

import os
import struct
from ctypes import *

host = '0.0.0.0'


# 定义IP头,Python ctypes结构体将接收到的数据的前20字节解析成可读的IP头
class IP(Structure):
    # Structure必须定义_fields_属性,其必须为ctypes类型
    _fields_ = [
        ("ihl", c_ubyte, 4),  # ip head length头长度,后面的4是比特位标志,说明字段按比特位计算
        ("version", c_ubyte, 4),  # 版本号
        ("tos", c_ubyte),  # 服务类型
        ("len", c_ushort),  # IP数据包总长
        ("id", c_ushort),  # 标识符
        ("offset", c_ushort),  # 片偏移
        ("ttl", c_ubyte),  # 生存时间
        ("protocol_num", c_ubyte),  # 协议类型,协议数字,后面有提到
        ("sum", c_ushort),  # 校验和
        ("src", c_uint32),  # 源IP c_uint32 The problem is with 32 vs 64 bit systems
        ("dst", c_uint32)  # 目的IP
    ]

    # 将原始数据中的数据(从网络中接收的数据)填充到结构中
    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    # 当调用__init__方法时,__new__方法已经完成对缓冲区中数据的处理
    # 参数self是对象本身,在__new__那里是class本身
    # 在__init__方法内部对数据进行处理,输出可读性更强的协议类型和IP地址
    def __init__(self, socket_buffer=None):
        super().__init__()
        self.socket_buffer = socket_buffer
        # 协议字段与协议名称对应
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        # 可读性更强的IP地址
        # inet_ntoa将32位的IPV4地址转换为平常看到的点分十进制形式(将网络地址转换成“.”点隔的字符串格式)
        # struct.pack将C结构数据src,dst转为Python字符串
        self.src_address = socket.inet_ntoa(struct.pack("@I", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("@I", self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


if os.name == 'nt':
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:

    while True:

        # 读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]

        # 将缓冲区前20个字节按IP头进行解析
        ip_header = IP(raw_buffer[:20])

        # 输出协议和通信双方IP地址
        print("Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))

# 处理CTRL-C
except KeyboardInterrupt:

    # 如果运行在Windows上,关闭混杂模式
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

解码ICMP

import socket

import os
# struct用于处理存储在文件或网络连接或其它来源中的二进制数据
import struct
from ctypes import *


class IP(Structure):

    _fields_ = [
        ("ihl", c_ubyte, 4),
        ("version", c_ubyte, 4),
        ("tos", c_ubyte),
        ("len", c_ushort),
        ("id", c_ushort),
        ("offset", c_ushort),
        ("ttl", c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum", c_ushort),
        ("src", c_uint32),
        ("dst", c_uint32)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        super().__init__()
        self.socket_buffer = socket_buffer
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        self.src_address = socket.inet_ntoa(struct.pack("@I", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("@I", self.dst))

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


class ICMP(Structure):

    _fields_ = [
        ("type", c_ubyte),
        ("code", c_ubyte),
        ("checksum", c_ushort),
        ("unused", c_ushort),
        ("next_hop_mtu", c_ushort)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        super(ICMP, self).__init__()
        self.socket_buffer = socket_buffer


if os.name == 'nt':
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:

    while True:

        raw_buffer = sniffer.recvfrom(65565)[0]

        ip_header = IP(raw_buffer[:20])

        print("Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))

        # 如果为ICMP,进行处理
        if ip_header.protocol == 'ICMP':

            # 计算ICMP原始数据包偏移 ihl一个字节4比特位
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset: offset + sizeof(ICMP)]

            # 解析ICMP数据
            icmp_header = ICMP(buf)

        print("ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code))

except KeyboardInterrupt:

    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

UDP主机发现工具

import threading
import time
# netaddr库允许对诸如192.168.0.0/24网段进行处理
from netaddr import IPNetwork, IPAddress

import socket
import os
import struct
from ctypes import *

# 监听的主机
host = '10.173.168.16'

# 扫描的目标子网
# /24 = 10.173.168.1 - 10.173.168.254
subnet = '10.173.168.0/24'

# 自定义字符串,将在ICMP响应中进行核对
magic_message = b"PYTHON"


# 批量发送UDP数据包到子网每一个IP地址上
def udp_sender(subnet, magic_message):

    # time.sleep(5)
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    for ip in IPNetwork(subnet):

        try:
            # 发送magic_message到子网到每个ip,端口是65212
            sender.sendto(magic_message, ("%s" % ip, 65212))
        except:
            pass


class IP(Structure):

    _fields_ = [
        ("ihl", c_ubyte, 4),
        ("version", c_ubyte, 4),
        ("tos", c_ubyte),
        ("len", c_ushort),
        ("id", c_ushort),
        ("offset", c_ushort),
        ("ttl", c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum", c_ushort),
        ("src", c_uint32),
        ("dst", c_uint32)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        super().__init__()
        self.socket_buffer = socket_buffer
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        self.src_address = socket.inet_ntoa(struct.pack("@I", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("@I", self.dst))

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


class ICMP(Structure):

    _fields_ = [
        ("type", c_ubyte),
        ("code", c_ubyte),
        ("checksum", c_ushort),
        ("unused", c_ushort),
        ("next_hop_mtu", c_ushort)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        super(ICMP, self).__init__()
        self.socket_buffer = socket_buffer


if os.name == 'nt':
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 创建线程进行发送数据包,以免影响对响应数据的嗅探
t = threading.Thread(target=udp_sender, args=(subnet, magic_message))
t.start()

try:

    while True:

        raw_buffer = sniffer.recvfrom(65565)[0]
        ip_header = IP(raw_buffer[:20])

        if ip_header.protocol == 'ICMP':

            offset = ip_header.ihl*4
            buf = raw_buffer[offset:offset+sizeof(ICMP)]
            icmp_header = ICMP(buf)

            # 检测是否为目的不可达响应数据包
            if icmp_header.code == 3 and icmp_header.type == 3:

                # 确认响应主机在目标子网内
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):

                    # 确认ICMP数据中包含发送的自定义数据包,此ICMP包为UDP数据包触发
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print("HOST UP: %s" % ip_header.src_address)

except KeyboardInterrupt:

    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)