Files
monibuca/scripts/packet_replayer.py

296 lines
13 KiB
Python

#!/usr/bin/env python3
import argparse
from scapy.all import rdpcap, IP, TCP, UDP, Raw, send, sr1, sr, PcapReader
import sys
import time
from collections import defaultdict
import random
import threading
import queue
import socket
import heapq
class PacketReplayer:
def __init__(self, pcap_file, target_ip, target_port):
self.pcap_file = pcap_file
self.target_ip = target_ip
self.target_port = target_port
self.connections = defaultdict(list) # 存储每个连接的包序列
self.response_queue = queue.Queue()
self.stop_reading = threading.Event()
self.socket = None
self.next_seq = None # 下一个期望的序列号
self.pending_packets = [] # 使用优先队列存储待发送的包
self.seen_packets = set() # 用于去重
self.initial_seq = None # 初始序列号
self.initial_ack = None # 初始确认号
self.client_ip = None # 客户端IP
self.client_port = None # 客户端端口
self.first_data_packet = True # 标记是否是第一个数据包
self.total_packets_sent = 0 # 发送的数据包数量
self.total_bytes_sent = 0 # 发送的总字节数
# 添加时间控制相关属性
self.first_packet_time = None # 第一个包的时间戳
self.use_original_timing = True # 是否使用原始时间间隔
self.last_packet_time = None # 上一个包的时间戳
def establish_tcp_connection(self, src_port):
"""建立TCP连接"""
print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 不绑定源端口,让系统自动分配
self.socket.settimeout(5)
self.socket.connect((self.target_ip, self.target_port))
actual_port = self.socket.getsockname()[1]
print(f"使用本地端口: {actual_port}")
print("TCP连接已建立")
return True
except Exception as e:
print(f"建立连接失败: {e}")
if self.socket:
self.socket.close()
self.socket = None
return False
def process_packet(self, packet, src_ip=None, src_port=None, protocol=None):
"""处理单个数据包"""
if IP not in packet:
return
if src_ip and packet[IP].src != src_ip:
return
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
elif not protocol:
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
return
conn_id = (packet[IP].src, packet[TCP].sport)
self.connections[conn_id].append(packet)
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
return
conn_id = (packet[IP].src, packet[UDP].sport)
self.connections[conn_id].append(packet)
def send_packet(self, packet, packet_count):
"""发送单个数据包,处理序列号"""
if TCP not in packet or IP not in packet:
return True
try:
# 检查是否是发送到目标端口的包
if packet[TCP].dport == self.target_port:
# 记录客户端信息
if self.client_ip is None:
self.client_ip = packet[IP].src
self.client_port = packet[TCP].sport
print(f"识别到客户端: {self.client_ip}:{self.client_port}")
# 获取TCP序列号和确认号
seq = packet[TCP].seq
ack = packet[TCP].ack
flags = packet[TCP].flags
# 打印数据包信息
print(f"[序号:{packet_count}] 处理数据包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}, seq={seq}, ack={ack}, flags={flags}")
# 发送当前包
if Raw in packet:
# 如果是第一个数据包,记录初始序列号
if self.first_data_packet:
self.initial_seq = seq
self.next_seq = seq
self.first_data_packet = False
print(f"第一个数据包,初始序列号: {seq}")
# 如果是重传包,跳过
if seq in self.seen_packets:
print(f"跳过重传包,序列号: {seq}")
return True
# 如果序列号大于期望的序列号,将包放入待发送队列
if seq > self.next_seq:
print(f"包乱序,放入队列,序列号: {seq}, 期望序列号: {self.next_seq}")
heapq.heappush(self.pending_packets, (seq, packet))
return True
payload = packet[Raw].load
print(f"准备发送数据包,负载大小: {len(payload)} 字节")
self.socket.send(payload)
self.seen_packets.add(seq)
old_seq = self.next_seq
self.next_seq = self.next_seq + len(payload)
print(f"更新序列号: {old_seq} -> {self.next_seq}")
# 更新统计信息
self.total_packets_sent += 1
self.total_bytes_sent += len(payload)
# 检查并发送待发送队列中的包
while self.pending_packets and self.pending_packets[0][0] == self.next_seq:
_, next_packet = heapq.heappop(self.pending_packets)
if Raw in next_packet:
next_payload = next_packet[Raw].load
print(f"发送队列中的包,负载大小: {len(next_payload)} 字节")
self.socket.send(next_payload)
self.seen_packets.add(self.next_seq)
old_seq = self.next_seq
self.next_seq += len(next_payload)
print(f"更新序列号: {old_seq} -> {self.next_seq}")
# 更新统计信息
self.total_packets_sent += 1
self.total_bytes_sent += len(next_payload)
packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time)))
print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (序列号: {seq}, 负载大小: {len(payload)} 字节)")
else:
# 对于控制包,只记录到已处理集合
if flags & 0x02: # SYN
print(f"[序号:{packet_count}] 处理SYN包")
elif flags & 0x10: # ACK
print(f"[序号:{packet_count}] 处理ACK包")
else:
print(f"[序号:{packet_count}] 跳过无负载包")
else:
print(f"[序号:{packet_count}] 跳过非目标端口的包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}")
return True
except Exception as e:
print(f"发送数据包 {packet_count} 时出错: {e}")
return False
def response_reader(self, src_port):
"""持续读取服务器响应的线程函数"""
while not self.stop_reading.is_set() and self.socket:
try:
data = self.socket.recv(4096)
if data:
self.response_queue.put(data)
print(f"收到响应: {len(data)} 字节")
except socket.timeout:
continue
except Exception as e:
if not self.stop_reading.is_set():
print(f"读取响应时出错: {e}")
break
time.sleep(0.1)
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
"""边读取边重放数据包"""
print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}")
print(f"使用原始时间间隔发送数据包")
try:
reader = PcapReader(self.pcap_file)
packet_count = 0
connection_established = False
for packet in reader:
packet_count += 1
if IP not in packet:
continue
if src_ip and packet[IP].src != src_ip:
continue
current_src_port = None
if protocol == 'tcp' and TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif protocol == 'udp' and UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
elif not protocol:
if TCP in packet:
if src_port and packet[TCP].sport != src_port:
continue
current_src_port = packet[TCP].sport
elif UDP in packet:
if src_port and packet[UDP].sport != src_port:
continue
current_src_port = packet[UDP].sport
else:
continue
else:
continue
if not connection_established:
if not self.establish_tcp_connection(current_src_port):
print("无法建立连接,退出")
return
self.stop_reading.clear()
reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,))
reader_thread.daemon = True
reader_thread.start()
connection_established = True
# 处理时间间隔
current_time = float(packet.time)
if self.first_packet_time is None:
self.first_packet_time = current_time
self.last_packet_time = current_time
elif self.use_original_timing:
# 计算与上一个包的时间差
time_diff = current_time - self.last_packet_time
if time_diff > 0:
print(f"等待 {time_diff:.3f} 秒后发送下一个包...")
time.sleep(time_diff)
self.last_packet_time = current_time
if not self.send_packet(packet, packet_count):
print("发送数据包失败,退出")
return
if delay > 0:
time.sleep(delay)
print(f"\n统计信息:")
print(f"总共处理了 {packet_count} 个数据包")
print(f"成功发送了 {self.total_packets_sent} 个数据包")
print(f"总共发送了 {self.total_bytes_sent} 字节数据")
if self.first_packet_time is not None:
total_time = float(self.last_packet_time - self.first_packet_time)
print(f"总耗时: {total_time:.3f}")
print(f"平均发送速率: {self.total_bytes_sent / total_time:.2f} 字节/秒")
except Exception as e:
print(f"处理数据包时出错: {e}")
sys.exit(1)
finally:
self.stop_reading.set()
if self.socket:
self.socket.close()
self.socket = None
reader.close()
def main():
parser = argparse.ArgumentParser(description='Wireshark数据包重放工具')
parser.add_argument('pcap_file', help='pcap文件路径')
parser.add_argument('target_ip', help='目标IP地址')
parser.add_argument('target_port', type=int, help='目标端口')
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
parser.add_argument('--src-ip', help='过滤源IP地址')
parser.add_argument('--src-port', type=int, help='过滤源端口')
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
args = parser.parse_args()
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
if __name__ == '__main__':
main()