mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-26 23:05:55 +08:00
fix: improve packet replayer
This commit is contained in:
@@ -9,288 +9,523 @@ import threading
|
|||||||
import queue
|
import queue
|
||||||
import socket
|
import socket
|
||||||
import heapq
|
import heapq
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
class PacketReplayer:
|
class PacketReplayer:
|
||||||
def __init__(self, pcap_file, target_ip, target_port):
|
def __init__(self, pcap_file, target_ip, target_port):
|
||||||
self.pcap_file = pcap_file
|
self.pcap_file = pcap_file
|
||||||
self.target_ip = target_ip
|
self.target_ip = target_ip
|
||||||
self.target_port = target_port
|
self.target_port = target_port
|
||||||
self.connections = defaultdict(list) # 存储每个连接的包序列
|
self.connections = defaultdict(list) # 存<EFBFBD><EFBFBD>每个连接的包序列
|
||||||
self.response_queue = queue.Queue()
|
self.response_queue = queue.Queue()
|
||||||
self.stop_reading = threading.Event()
|
self.stop_reading = threading.Event()
|
||||||
self.socket = None
|
self.socket = None
|
||||||
self.next_seq = None # 下一个期望的序列号
|
|
||||||
self.pending_packets = [] # 使用优先队列存储待发送的包
|
|
||||||
self.seen_packets = set() # 用于去重
|
|
||||||
self.initial_seq = None # 初始序列号
|
|
||||||
self.initial_ack = None # 初始确认号
|
|
||||||
self.client_ip = None # 客户端IP
|
|
||||||
self.client_port = None # 客户端端口
|
|
||||||
self.first_data_packet = True # 标记是否是第一个数据包
|
|
||||||
self.total_packets_sent = 0 # 发送的数据包数量
|
self.total_packets_sent = 0 # 发送的数据包数量
|
||||||
self.total_bytes_sent = 0 # 发送的总字节数
|
self.total_bytes_sent = 0 # 发送的总字节数
|
||||||
# 添加时间控制相关属性
|
# 添加时间控制相关属性
|
||||||
self.first_packet_time = None # 第一个包的时间戳
|
self.first_packet_time = None # 第一个包的时间戳
|
||||||
self.use_original_timing = True # 是否使用原始时间间隔
|
self.use_original_timing = True # 是否使用原始时间间隔
|
||||||
self.last_packet_time = None # 上一个包的时间戳
|
self.start_time = None # 重放开始时间
|
||||||
|
self.last_activity_time = None # 最后活动时间
|
||||||
|
self.keepalive_interval = 30.0 # 保活间隔(秒)
|
||||||
|
self.connection_timeout = 60.0 # 连接超时时间(秒)
|
||||||
|
# 简化的数据包管理
|
||||||
|
self.data_packets = [] # 按时间顺序存储所有数据包
|
||||||
|
self.processed_count = 0
|
||||||
|
|
||||||
|
def log_with_timestamp(self, message):
|
||||||
|
"""带时间戳的日志输出"""
|
||||||
|
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||||||
|
print(f"[{timestamp}] {message}")
|
||||||
|
|
||||||
def establish_tcp_connection(self, src_port):
|
def establish_tcp_connection(self, src_port):
|
||||||
"""建立TCP连接"""
|
"""建立TCP连接"""
|
||||||
print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...")
|
self.log_with_timestamp(f"正在建<EFBFBD><EFBFBD><EFBFBD>TCP连接 {self.target_ip}:{self.target_port}...")
|
||||||
try:
|
try:
|
||||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
# 不绑定源端口,让系统自动分配
|
# 设置socket选项
|
||||||
self.socket.settimeout(5)
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||||
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||||
|
|
||||||
|
# 设置较大的缓冲区
|
||||||
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2*1024*1024) # 2MB发送缓冲区
|
||||||
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2*1024*1024) # 2MB接收缓冲区
|
||||||
|
|
||||||
|
self.socket.settimeout(self.connection_timeout)
|
||||||
self.socket.connect((self.target_ip, self.target_port))
|
self.socket.connect((self.target_ip, self.target_port))
|
||||||
actual_port = self.socket.getsockname()[1]
|
actual_port = self.socket.getsockname()[1]
|
||||||
print(f"使用本地端口: {actual_port}")
|
self.last_activity_time = time.time()
|
||||||
print("TCP连接已建立")
|
self.log_with_timestamp(f"使用本地端口: {actual_port}")
|
||||||
|
self.log_with_timestamp("TCP连接已建立")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"建立连接失败: {e}")
|
self.log_with_timestamp(f"建立连接失败: {e}")
|
||||||
if self.socket:
|
if self.socket:
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
self.socket = None
|
self.socket = None
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def process_packet(self, packet, src_ip=None, src_port=None, protocol=None):
|
def load_packets(self, src_ip=None, src_port=None, protocol=None):
|
||||||
"""处理单个数据包"""
|
"""预加载所有数据包,改进数据包收集逻辑"""
|
||||||
if IP not in packet:
|
self.log_with_timestamp("开始加载数据包...")
|
||||||
return
|
|
||||||
|
|
||||||
if src_ip and packet[IP].src != src_ip:
|
|
||||||
return
|
|
||||||
|
|
||||||
if protocol == 'tcp' and TCP in packet:
|
|
||||||
if src_port and packet[TCP].sport != src_port:
|
|
||||||
return
|
|
||||||
conn_id = (packet[IP].src, packet[TCP].sport)
|
|
||||||
self.connections[conn_id].append(packet)
|
|
||||||
elif protocol == 'udp' and UDP in packet:
|
|
||||||
if src_port and packet[UDP].sport != src_port:
|
|
||||||
return
|
|
||||||
conn_id = (packet[IP].src, packet[UDP].sport)
|
|
||||||
self.connections[conn_id].append(packet)
|
|
||||||
elif not protocol:
|
|
||||||
if TCP in packet:
|
|
||||||
if src_port and packet[TCP].sport != src_port:
|
|
||||||
return
|
|
||||||
conn_id = (packet[IP].src, packet[TCP].sport)
|
|
||||||
self.connections[conn_id].append(packet)
|
|
||||||
elif UDP in packet:
|
|
||||||
if src_port and packet[UDP].sport != src_port:
|
|
||||||
return
|
|
||||||
conn_id = (packet[IP].src, packet[UDP].sport)
|
|
||||||
self.connections[conn_id].append(packet)
|
|
||||||
|
|
||||||
def send_packet(self, packet, packet_count):
|
|
||||||
"""发送单个数据包,处理序列号"""
|
|
||||||
if TCP not in packet or IP not in packet:
|
|
||||||
return True
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 检查是否是发送到目标端口的包
|
|
||||||
if packet[TCP].dport == self.target_port:
|
|
||||||
# 记录客户端信息
|
|
||||||
if self.client_ip is None:
|
|
||||||
self.client_ip = packet[IP].src
|
|
||||||
self.client_port = packet[TCP].sport
|
|
||||||
print(f"识别到客户端: {self.client_ip}:{self.client_port}")
|
|
||||||
|
|
||||||
# 获取TCP序列号和确认号
|
|
||||||
seq = packet[TCP].seq
|
|
||||||
ack = packet[TCP].ack
|
|
||||||
flags = packet[TCP].flags
|
|
||||||
|
|
||||||
# 打印数据包信息
|
|
||||||
print(f"[序号:{packet_count}] 处理数据包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}, seq={seq}, ack={ack}, flags={flags}")
|
|
||||||
|
|
||||||
# 发送当前包
|
|
||||||
if Raw in packet:
|
|
||||||
# 如果是第一个数据包,记录初始序列号
|
|
||||||
if self.first_data_packet:
|
|
||||||
self.initial_seq = seq
|
|
||||||
self.next_seq = seq
|
|
||||||
self.first_data_packet = False
|
|
||||||
print(f"第一个数据包,初始序列号: {seq}")
|
|
||||||
|
|
||||||
# 如果是重传包,跳过
|
|
||||||
if seq in self.seen_packets:
|
|
||||||
print(f"跳过重传包,序列号: {seq}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
# 如果序列号大于期望的序列号,将包放入待发送队列
|
|
||||||
if seq > self.next_seq:
|
|
||||||
print(f"包乱序,放入队列,序列号: {seq}, 期望序列号: {self.next_seq}")
|
|
||||||
heapq.heappush(self.pending_packets, (seq, packet))
|
|
||||||
return True
|
|
||||||
|
|
||||||
payload = packet[Raw].load
|
|
||||||
print(f"准备发送数据包,负载大小: {len(payload)} 字节")
|
|
||||||
self.socket.send(payload)
|
|
||||||
self.seen_packets.add(seq)
|
|
||||||
old_seq = self.next_seq
|
|
||||||
self.next_seq = self.next_seq + len(payload)
|
|
||||||
print(f"更新序列号: {old_seq} -> {self.next_seq}")
|
|
||||||
|
|
||||||
# 更新统计信息
|
|
||||||
self.total_packets_sent += 1
|
|
||||||
self.total_bytes_sent += len(payload)
|
|
||||||
|
|
||||||
# 检查并发送待发送队列中的包
|
|
||||||
while self.pending_packets and self.pending_packets[0][0] == self.next_seq:
|
|
||||||
_, next_packet = heapq.heappop(self.pending_packets)
|
|
||||||
if Raw in next_packet:
|
|
||||||
next_payload = next_packet[Raw].load
|
|
||||||
print(f"发送队列中的包,负载大小: {len(next_payload)} 字节")
|
|
||||||
self.socket.send(next_payload)
|
|
||||||
self.seen_packets.add(self.next_seq)
|
|
||||||
old_seq = self.next_seq
|
|
||||||
self.next_seq += len(next_payload)
|
|
||||||
print(f"更新序列号: {old_seq} -> {self.next_seq}")
|
|
||||||
|
|
||||||
# 更新统计信息
|
|
||||||
self.total_packets_sent += 1
|
|
||||||
self.total_bytes_sent += len(next_payload)
|
|
||||||
|
|
||||||
packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time)))
|
|
||||||
print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (序列号: {seq}, 负载大小: {len(payload)} 字节)")
|
|
||||||
else:
|
|
||||||
# 对于控制包,只记录到已处理集合
|
|
||||||
if flags & 0x02: # SYN
|
|
||||||
print(f"[序号:{packet_count}] 处理SYN包")
|
|
||||||
elif flags & 0x10: # ACK
|
|
||||||
print(f"[序号:{packet_count}] 处理ACK包")
|
|
||||||
else:
|
|
||||||
print(f"[序号:{packet_count}] 跳过无负载包")
|
|
||||||
else:
|
|
||||||
print(f"[序号:{packet_count}] 跳过非目标端口的包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"发送数据包 {packet_count} 时出错: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def response_reader(self, src_port):
|
|
||||||
"""持续读取服务器响应的线程函数"""
|
|
||||||
while not self.stop_reading.is_set() and self.socket:
|
|
||||||
try:
|
|
||||||
data = self.socket.recv(4096)
|
|
||||||
if data:
|
|
||||||
self.response_queue.put(data)
|
|
||||||
print(f"收到响应: {len(data)} 字节")
|
|
||||||
except socket.timeout:
|
|
||||||
continue
|
|
||||||
except Exception as e:
|
|
||||||
if not self.stop_reading.is_set():
|
|
||||||
print(f"读取响应时出错: {e}")
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
|
|
||||||
"""边读取边重放数据包"""
|
|
||||||
print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}")
|
|
||||||
print(f"使用原始时间间隔发送数据包")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reader = PcapReader(self.pcap_file)
|
reader = PcapReader(self.pcap_file)
|
||||||
packet_count = 0
|
packet_count = 0
|
||||||
connection_established = False
|
total_pcap_packets = 0
|
||||||
|
handshake_packets = []
|
||||||
|
|
||||||
for packet in reader:
|
for packet in reader:
|
||||||
packet_count += 1
|
total_pcap_packets += 1
|
||||||
|
|
||||||
if IP not in packet:
|
if IP not in packet or TCP not in packet:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
packet_count += 1
|
||||||
|
|
||||||
|
# 更宽松的过滤条件
|
||||||
if src_ip and packet[IP].src != src_ip:
|
if src_ip and packet[IP].src != src_ip:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
current_src_port = None
|
if src_port and packet[TCP].sport != src_port:
|
||||||
if protocol == 'tcp' and TCP in packet:
|
|
||||||
if src_port and packet[TCP].sport != src_port:
|
|
||||||
continue
|
|
||||||
current_src_port = packet[TCP].sport
|
|
||||||
elif protocol == 'udp' and UDP in packet:
|
|
||||||
if src_port and packet[UDP].sport != src_port:
|
|
||||||
continue
|
|
||||||
current_src_port = packet[UDP].sport
|
|
||||||
elif not protocol:
|
|
||||||
if TCP in packet:
|
|
||||||
if src_port and packet[TCP].sport != src_port:
|
|
||||||
continue
|
|
||||||
current_src_port = packet[TCP].sport
|
|
||||||
elif UDP in packet:
|
|
||||||
if src_port and packet[UDP].sport != src_port:
|
|
||||||
continue
|
|
||||||
current_src_port = packet[UDP].sport
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not connection_established:
|
# 检查是否是目标连接的数据包(双向)
|
||||||
if not self.establish_tcp_connection(current_src_port):
|
is_target_connection = (
|
||||||
print("无法建立连接,退出")
|
(packet[TCP].dport == self.target_port) or # 发送到目标端口
|
||||||
return
|
(packet[TCP].sport == self.target_port) # 从目标端口返回
|
||||||
self.stop_reading.clear()
|
)
|
||||||
reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,))
|
|
||||||
reader_thread.daemon = True
|
if is_target_connection and Raw in packet:
|
||||||
reader_thread.start()
|
payload = packet[Raw].load
|
||||||
connection_established = True
|
packet_info = {
|
||||||
|
'timestamp': float(packet.time),
|
||||||
# 处理时间间隔
|
'payload': payload,
|
||||||
current_time = float(packet.time)
|
'seq': packet[TCP].seq,
|
||||||
if self.first_packet_time is None:
|
'packet_count': packet_count,
|
||||||
self.first_packet_time = current_time
|
'direction': 'to_server' if packet[TCP].dport == self.target_port else 'from_server'
|
||||||
self.last_packet_time = current_time
|
}
|
||||||
elif self.use_original_timing:
|
|
||||||
# 计算与上一个包的时间差
|
# 检查是否是RTMP握手包(前几个包通常是握手)
|
||||||
time_diff = current_time - self.last_packet_time
|
if len(self.data_packets) < 20:
|
||||||
if time_diff > 0:
|
handshake_packets.append(packet_info)
|
||||||
print(f"等待 {time_diff:.3f} 秒后发送下一个包...")
|
|
||||||
time.sleep(time_diff)
|
# 只收集发送到服务器的包用于重放
|
||||||
self.last_packet_time = current_time
|
if packet[TCP].dport == self.target_port:
|
||||||
|
self.data_packets.append(packet_info)
|
||||||
if not self.send_packet(packet, packet_count):
|
|
||||||
print("发送数据包失败,退出")
|
# 按时间戳排序
|
||||||
return
|
self.data_packets.sort(key=lambda x: x['timestamp'])
|
||||||
|
|
||||||
if delay > 0:
|
total_data_size = sum(len(p['payload']) for p in self.data_packets)
|
||||||
time.sleep(delay)
|
|
||||||
|
self.log_with_timestamp(f"PCAP文件总包数: {total_pcap_packets}")
|
||||||
print(f"\n统计信息:")
|
self.log_with_timestamp(f"TCP包数: {packet_count}")
|
||||||
print(f"总共处理了 {packet_count} 个数据包")
|
self.log_with_timestamp(f"发现握手包: {len(handshake_packets)}")
|
||||||
print(f"成功发送了 {self.total_packets_sent} 个数据包")
|
self.log_with_timestamp(f"待发送数据包: {len(self.data_packets)}")
|
||||||
print(f"总共发送了 {self.total_bytes_sent} 字节数据")
|
self.log_with_timestamp(f"总数据量: {total_data_size / (1024*1024):.2f} MB")
|
||||||
if self.first_packet_time is not None:
|
|
||||||
total_time = float(self.last_packet_time - self.first_packet_time)
|
# 分析第一个包,检查是否是RTMP握手
|
||||||
print(f"总耗时: {total_time:.3f} 秒")
|
if self.data_packets:
|
||||||
print(f"平均发送速率: {self.total_bytes_sent / total_time:.2f} 字节/秒")
|
first_packet = self.data_packets[0]
|
||||||
|
if len(first_packet['payload']) >= 4:
|
||||||
|
first_bytes = first_packet['payload'][:4]
|
||||||
|
self.log_with_timestamp(f"第一个包前4字节: {first_bytes.hex()}")
|
||||||
|
|
||||||
|
# RTMP握手C0包通常是03开头
|
||||||
|
if first_bytes[0] == 0x03:
|
||||||
|
self.log_with_timestamp("检测到RTMP握手包")
|
||||||
|
else:
|
||||||
|
self.log_with_timestamp("警告:第一个包可能不是RTMP握手包")
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"处理数据包时出错: {e}")
|
self.log_with_timestamp(f"加载数据包时出错: {e}")
|
||||||
sys.exit(1)
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_data_with_flow_control(self, data, max_chunk_size=1024):
|
||||||
|
"""发送数据,使用极其严格的流控制来避免RTMP协议错误"""
|
||||||
|
if not self.socket:
|
||||||
|
return False
|
||||||
|
|
||||||
|
total_sent = 0
|
||||||
|
data_len = len(data)
|
||||||
|
|
||||||
|
# 对于大数据包,强制增加延迟
|
||||||
|
if data_len > 2000:
|
||||||
|
time.sleep(0.02) # 大包延迟20ms
|
||||||
|
elif data_len > 1000:
|
||||||
|
time.sleep(0.01) # 中包延迟10ms
|
||||||
|
else:
|
||||||
|
time.sleep(0.005) # 小包延迟5ms
|
||||||
|
|
||||||
|
while total_sent < data_len:
|
||||||
|
try:
|
||||||
|
# 使用很小的块大小,确保不会超过RTMP消息边界
|
||||||
|
remaining = data_len - total_sent
|
||||||
|
chunk_size = min(max_chunk_size, remaining)
|
||||||
|
|
||||||
|
chunk = data[total_sent:total_sent + chunk_size]
|
||||||
|
sent = self.socket.send(chunk)
|
||||||
|
|
||||||
|
if sent == 0:
|
||||||
|
self.log_with_timestamp("连接已断开")
|
||||||
|
return False
|
||||||
|
|
||||||
|
total_sent += sent
|
||||||
|
self.last_activity_time = time.time()
|
||||||
|
|
||||||
|
# 如果没有完全发送当前块,继续发送剩余部分
|
||||||
|
if sent < len(chunk):
|
||||||
|
self.log_with_timestamp(f"部分发送: {sent}/{len(chunk)} 字节")
|
||||||
|
|
||||||
|
# 每个块之间都要延迟,给RTMP协议栈处理时间
|
||||||
|
if total_sent < data_len:
|
||||||
|
time.sleep(0.002) # 每块之间2ms延迟
|
||||||
|
|
||||||
|
# 每发送512字节检查一次服务器响应
|
||||||
|
if total_sent % 512 == 0:
|
||||||
|
try:
|
||||||
|
self.socket.settimeout(0.001)
|
||||||
|
response = self.socket.recv(1024)
|
||||||
|
if response:
|
||||||
|
self.response_queue.put(response)
|
||||||
|
self.last_activity_time = time.time()
|
||||||
|
# 收到响应后稍微等待,让服务器处理
|
||||||
|
time.sleep(0.001)
|
||||||
|
except socket.timeout:
|
||||||
|
pass # 没有响应数据
|
||||||
|
except Exception:
|
||||||
|
pass # 忽略其他错误
|
||||||
|
finally:
|
||||||
|
self.socket.settimeout(self.connection_timeout)
|
||||||
|
|
||||||
|
except socket.timeout:
|
||||||
|
self.log_with_timestamp(f"发送超时,已发送 {total_sent}/{data_len} 字节")
|
||||||
|
return False
|
||||||
|
except socket.error as e:
|
||||||
|
self.log_with_timestamp(f"发送错误: {e}, 已发送 {total_sent}/{data_len} 字节")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 发送完一个完整包后,给服务器更多处理时间
|
||||||
|
time.sleep(0.005) # 包间延迟5ms
|
||||||
|
return True
|
||||||
|
|
||||||
|
def response_reader(self):
|
||||||
|
"""持续读取服务器响应"""
|
||||||
|
while not self.stop_reading.is_set() and self.socket:
|
||||||
|
try:
|
||||||
|
self.socket.settimeout(5.0)
|
||||||
|
data = self.socket.recv(8192)
|
||||||
|
if data:
|
||||||
|
self.response_queue.put(data)
|
||||||
|
self.last_activity_time = time.time()
|
||||||
|
self.log_with_timestamp(f"收到响应: {len(data)} 字节")
|
||||||
|
else:
|
||||||
|
self.log_with_timestamp("服务器关闭了连接")
|
||||||
|
break
|
||||||
|
except socket.timeout:
|
||||||
|
# 检查连接是否长时间无活动
|
||||||
|
current_time = time.time()
|
||||||
|
if self.last_activity_time and (current_time - self.last_activity_time) > self.keepalive_interval:
|
||||||
|
self.log_with_timestamp("连接长时间无活动,可能已断开")
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
if not self.stop_reading.is_set():
|
||||||
|
self.log_with_timestamp(f"读取响应时出错: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
def calculate_timing(self, current_packet_time):
|
||||||
|
"""计算时间间隔"""
|
||||||
|
if self.first_packet_time is None:
|
||||||
|
self.first_packet_time = current_packet_time
|
||||||
|
self.start_time = time.time()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# 计算应该等待的时间
|
||||||
|
elapsed_in_capture = current_packet_time - self.first_packet_time
|
||||||
|
elapsed_in_replay = time.time() - self.start_time
|
||||||
|
wait_time = elapsed_in_capture - elapsed_in_replay
|
||||||
|
|
||||||
|
# 限制最大等待时间
|
||||||
|
max_wait = 5.0
|
||||||
|
if wait_time > max_wait:
|
||||||
|
self.log_with_timestamp(f"等待时间过长 ({wait_time:.3f}s),限制为 {max_wait}s")
|
||||||
|
wait_time = max_wait
|
||||||
|
|
||||||
|
return max(0, wait_time)
|
||||||
|
|
||||||
|
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
|
||||||
|
"""重放数据包,采用正确的TCP流重组方式"""
|
||||||
|
# 首先加载所有数据包
|
||||||
|
if not self.load_packets(src_ip, src_port, protocol):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.data_packets:
|
||||||
|
self.log_with_timestamp("没有找到可发送的数据包")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 正确<E6ADA3><E7A1AE><EFBFBD>TCP流重组 - 按序列号重组!
|
||||||
|
self.log_with_timestamp("重组TCP流...")
|
||||||
|
|
||||||
|
# 第一步:按序列号排序
|
||||||
|
self.data_packets.sort(key=lambda x: x['seq'])
|
||||||
|
self.log_with_timestamp(f"按序列号排序完成,共 {len(self.data_packets)} 个数据包")
|
||||||
|
|
||||||
|
# 第二步:正确重组TCP流
|
||||||
|
tcp_segments = []
|
||||||
|
expected_seq = self.data_packets[0]['seq']
|
||||||
|
self.log_with_timestamp(f"开始TCP重组,初始序列号: {expected_seq}")
|
||||||
|
|
||||||
|
processed_packets = 0
|
||||||
|
skipped_packets = 0
|
||||||
|
|
||||||
|
for packet_info in self.data_packets:
|
||||||
|
seq = packet_info['seq']
|
||||||
|
payload = packet_info['payload']
|
||||||
|
|
||||||
|
if seq == expected_seq:
|
||||||
|
# 序列号正确,添加到流中
|
||||||
|
tcp_segments.append(payload)
|
||||||
|
expected_seq += len(payload)
|
||||||
|
processed_packets += 1
|
||||||
|
elif seq < expected_seq:
|
||||||
|
# 处理重叠数据
|
||||||
|
overlap = expected_seq - seq
|
||||||
|
if len(payload) > overlap:
|
||||||
|
# 去除重叠部分,添加剩余数据
|
||||||
|
tcp_segments.append(payload[overlap:])
|
||||||
|
expected_seq = seq + len(payload)
|
||||||
|
processed_packets += 1
|
||||||
|
else:
|
||||||
|
# 完全重叠,跳过
|
||||||
|
skipped_packets += 1
|
||||||
|
else:
|
||||||
|
# 序列号大于期望值,说明有数据包丢失
|
||||||
|
gap = seq - expected_seq
|
||||||
|
self.log_with_timestamp(f"检测到数据包间隙: {gap} 字节,从序列号 {expected_seq} 到 {seq}")
|
||||||
|
# 跳过间隙,继续处理
|
||||||
|
tcp_segments.append(payload)
|
||||||
|
expected_seq = seq + len(payload)
|
||||||
|
processed_packets += 1
|
||||||
|
|
||||||
|
# 合并所有段
|
||||||
|
tcp_stream = b''.join(tcp_segments)
|
||||||
|
|
||||||
|
self.log_with_timestamp(f"TCP流重组完成:")
|
||||||
|
self.log_with_timestamp(f" - 处理了 {processed_packets} 个数据包")
|
||||||
|
self.log_with_timestamp(f" - 跳过了 {skipped_packets} 个重复包")
|
||||||
|
self.log_with_timestamp(f" - 重组后大小: {len(tcp_stream) / (1024*1024):.2f} MB")
|
||||||
|
|
||||||
|
# 验证RTMP握手
|
||||||
|
if len(tcp_stream) >= 4:
|
||||||
|
first_bytes = tcp_stream[:4]
|
||||||
|
self.log_with_timestamp(f"重组流前4字节: {first_bytes.hex()}")
|
||||||
|
if first_bytes[0] == 0x03:
|
||||||
|
self.log_with_timestamp("重组流包含正确的RTMP握手")
|
||||||
|
else:
|
||||||
|
self.log_with_timestamp("警告:重组流可能不是有效的RTMP流")
|
||||||
|
|
||||||
|
# 建立连接
|
||||||
|
if not self.establish_tcp_connection(None):
|
||||||
|
self.log_with_timestamp("无法建立连接")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 启动响应读取线程
|
||||||
|
self.stop_reading.clear()
|
||||||
|
reader_thread = threading.Thread(target=self.response_reader)
|
||||||
|
reader_thread.daemon = True
|
||||||
|
reader_thread.start()
|
||||||
|
|
||||||
|
self.log_with_timestamp(f"开始发送TCP流数据")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 使用更小的块大小,确保RTMP协议完整性
|
||||||
|
chunk_size = 1024 # 减小到1KB,更安全
|
||||||
|
sent_bytes = 0
|
||||||
|
chunk_count = 0
|
||||||
|
last_progress_time = time.time()
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
|
while sent_bytes < len(tcp_stream):
|
||||||
|
current_time = time.time()
|
||||||
|
|
||||||
|
# 每5秒显示一次进度
|
||||||
|
if current_time - last_progress_time >= 5.0:
|
||||||
|
progress = (sent_bytes / len(tcp_stream)) * 100
|
||||||
|
self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB)")
|
||||||
|
last_progress_time = current_time
|
||||||
|
|
||||||
|
# 计算当前块大小
|
||||||
|
remaining = len(tcp_stream) - sent_bytes
|
||||||
|
current_chunk_size = min(chunk_size, remaining)
|
||||||
|
|
||||||
|
# 发送数据块
|
||||||
|
chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size]
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.socket.settimeout(10) # 10秒超时
|
||||||
|
bytes_sent = self.socket.send(chunk)
|
||||||
|
|
||||||
|
if bytes_sent == 0:
|
||||||
|
self.log_with_timestamp("连接已断开")
|
||||||
|
break
|
||||||
|
|
||||||
|
sent_bytes += bytes_sent
|
||||||
|
chunk_count += 1
|
||||||
|
self.total_bytes_sent += bytes_sent
|
||||||
|
self.last_activity_time = time.time()
|
||||||
|
|
||||||
|
# 如果没有完全发送当前块,继续发送剩余部分
|
||||||
|
if bytes_sent < len(chunk):
|
||||||
|
self.log_with_timestamp(f"部分发送: {bytes_sent}/{len(chunk)} 字节")
|
||||||
|
continue
|
||||||
|
|
||||||
|
except socket.timeout:
|
||||||
|
self.log_with_timestamp(f"发送数据块超时,继续...")
|
||||||
|
continue
|
||||||
|
except socket.error as e:
|
||||||
|
self.log_with_timestamp(f"发送数据块失败: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 更温和的流控制
|
||||||
|
if chunk_count % 50 == 0: # 每50个块休眠
|
||||||
|
time.sleep(0.005) # 5ms休眠
|
||||||
|
|
||||||
|
self.log_with_timestamp(f"数据发送完成,等待服务器处理...")
|
||||||
|
time.sleep(3) # 等待服务器处理完成
|
||||||
|
|
||||||
|
self.log_with_timestamp(f"\n=== 发送完成 ===")
|
||||||
|
self.log_with_timestamp(f"成功发送了 {chunk_count} 个数据块")
|
||||||
|
self.log_with_timestamp(f"总共发送了 {self.total_bytes_sent} 字节数据 ({self.total_bytes_sent / (1024*1024):.2f} MB)")
|
||||||
|
if self.start_time:
|
||||||
|
total_time = time.time() - self.start_time
|
||||||
|
self.log_with_timestamp(f"总耗时: {total_time:.3f} 秒")
|
||||||
|
if total_time > 0:
|
||||||
|
self.log_with_timestamp(f"平均发送速率: {(self.total_bytes_sent / total_time) / (1024*1024):.2f} MB/s")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log_with_timestamp(f"重放过程中出错: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
finally:
|
finally:
|
||||||
self.stop_reading.set()
|
self.stop_reading.set()
|
||||||
if self.socket:
|
if self.socket:
|
||||||
|
self.log_with_timestamp("关闭TCP连接...")
|
||||||
|
try:
|
||||||
|
self.socket.shutdown(socket.SHUT_RDWR)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
self.socket = None
|
self.socket = None
|
||||||
|
|
||||||
|
def list_all_connections(self):
|
||||||
|
"""列出所有连接"""
|
||||||
|
self.log_with_timestamp("分析PCAP文件,列出所有连接...")
|
||||||
|
try:
|
||||||
|
reader = PcapReader(self.pcap_file)
|
||||||
|
connections = defaultdict(lambda: defaultdict(int))
|
||||||
|
|
||||||
|
for packet in reader:
|
||||||
|
if IP not in packet or TCP not in packet:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 统计每个源端口和目标端口的流量
|
||||||
|
src_port = packet[TCP].sport
|
||||||
|
dst_port = packet[TCP].dport
|
||||||
|
payload_size = len(packet[Raw].load) if Raw in packet else 0
|
||||||
|
|
||||||
|
connections[dst_port]['sent'] += payload_size
|
||||||
|
connections[src_port]['received'] += payload_size
|
||||||
|
|
||||||
reader.close()
|
reader.close()
|
||||||
|
|
||||||
|
# 按照发送和接收的字节数排序
|
||||||
|
sorted_connections = sorted(connections.items(), key=lambda item: item[1]['sent'] + item[1]['received'], reverse=True)
|
||||||
|
|
||||||
|
self.log_with_timestamp("发现的连接:")
|
||||||
|
for port, stats in sorted_connections:
|
||||||
|
self.log_with_timestamp(f"端口 {port}: 发送 {stats['sent']} 字节, 接收 {stats['received']} 字节")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log_with_timestamp(f"列出连接时出错: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
def find_largest_connection(self, src_ip=None):
|
||||||
|
"""自动选择数据量最大的连接"""
|
||||||
|
self.log_with_timestamp("正在查找数据量最大的连接...")
|
||||||
|
try:
|
||||||
|
reader = PcapReader(self.pcap_file)
|
||||||
|
connections = defaultdict(int)
|
||||||
|
|
||||||
|
for packet in reader:
|
||||||
|
if IP not in packet or TCP not in packet:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 只统计发送到目标IP的数据
|
||||||
|
if packet[IP].dst != self.target_ip:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 统计每个源端口的流量
|
||||||
|
src_port = packet[TCP].sport
|
||||||
|
payload_size = len(packet[Raw].load) if Raw in packet else 0
|
||||||
|
connections[src_port] += payload_size
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
|
||||||
|
# 找到数据量最大的源端口
|
||||||
|
if connections:
|
||||||
|
largest_port = max(connections.items(), key=lambda item: item[1])[0]
|
||||||
|
self.log_with_timestamp(f"数据量最大的连接: 源端口 {largest_port}")
|
||||||
|
return largest_port
|
||||||
|
else:
|
||||||
|
self.log_with_timestamp("未找到合适的连接")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log_with_timestamp(f"查找最大连接时出错: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return None
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Wireshark数据包重放工具')
|
parser = argparse.ArgumentParser(description='RTMP数据包重放工具 - 支持自动选择最大连接')
|
||||||
parser.add_argument('pcap_file', help='pcap文件路径')
|
parser.add_argument('pcap_file', help='pcap文件路径')
|
||||||
parser.add_argument('target_ip', help='目标IP地址')
|
parser.add_argument('target_ip', help='目标IP地址')
|
||||||
parser.add_argument('target_port', type=int, help='目标端口')
|
parser.add_argument('target_port', type=int, help='目标端口')
|
||||||
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
|
parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)')
|
||||||
parser.add_argument('--src-ip', help='过滤源IP地址')
|
parser.add_argument('--src-ip', help='过滤源IP地址')
|
||||||
parser.add_argument('--src-port', type=int, help='过滤源端口')
|
parser.add_argument('--src-port', type=int, help='过滤源端口 (留空自动选择最大连接)')
|
||||||
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
|
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
|
||||||
|
parser.add_argument('--auto-select', action='store_true', help='自动选择数据量最大的连接')
|
||||||
|
parser.add_argument('--list-connections', action='store_true', help='列出所有连接并退出')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
|
replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port)
|
||||||
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
|
|
||||||
|
# 如果指定了列出连接,就分析并显示所有连接
|
||||||
|
if args.list_connections:
|
||||||
|
replayer.list_all_connections()
|
||||||
|
return
|
||||||
|
|
||||||
|
# 如果启用自动选择或没有指定源端口,选择最大的连接
|
||||||
|
if args.auto_select or args.src_port is None:
|
||||||
|
best_port = replayer.find_largest_connection(args.src_ip)
|
||||||
|
if best_port:
|
||||||
|
print(f"自动选择数据量最大的连接: 端口 {best_port}")
|
||||||
|
replayer.replay_packets(args.src_ip, best_port, args.protocol, args.delay)
|
||||||
|
else:
|
||||||
|
print("未找到合适的连接")
|
||||||
|
else:
|
||||||
|
replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
126
scripts/pcap_analyzer.py
Normal file
126
scripts/pcap_analyzer.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
from scapy.all import rdpcap, IP, TCP, UDP, Raw, PcapReader
|
||||||
|
import sys
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
def analyze_pcap(pcap_file, target_port=1935):
|
||||||
|
"""分析pcap文件的详细信息"""
|
||||||
|
print(f"分析PCAP文件: {pcap_file}")
|
||||||
|
print(f"目标端口: {target_port}")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
try:
|
||||||
|
reader = PcapReader(pcap_file)
|
||||||
|
|
||||||
|
# 统计信息
|
||||||
|
total_packets = 0
|
||||||
|
tcp_packets = 0
|
||||||
|
udp_packets = 0
|
||||||
|
other_packets = 0
|
||||||
|
|
||||||
|
# 连接统计
|
||||||
|
connections = defaultdict(lambda: {'packets': 0, 'bytes': 0, 'to_target': 0, 'from_target': 0})
|
||||||
|
target_connections = defaultdict(lambda: {'packets': 0, 'bytes': 0})
|
||||||
|
|
||||||
|
# 数据大小统计
|
||||||
|
total_tcp_data = 0
|
||||||
|
target_tcp_data = 0
|
||||||
|
|
||||||
|
# IP统计
|
||||||
|
ip_stats = defaultdict(lambda: {'send_bytes': 0, 'recv_bytes': 0, 'packets': 0})
|
||||||
|
|
||||||
|
for packet in reader:
|
||||||
|
total_packets += 1
|
||||||
|
|
||||||
|
if IP in packet:
|
||||||
|
src_ip = packet[IP].src
|
||||||
|
dst_ip = packet[IP].dst
|
||||||
|
|
||||||
|
if TCP in packet:
|
||||||
|
tcp_packets += 1
|
||||||
|
src_port = packet[TCP].sport
|
||||||
|
dst_port = packet[TCP].dport
|
||||||
|
|
||||||
|
conn_key = f"{src_ip}:{src_port} -> {dst_ip}:{dst_port}"
|
||||||
|
connections[conn_key]['packets'] += 1
|
||||||
|
|
||||||
|
if Raw in packet:
|
||||||
|
payload_size = len(packet[Raw].load)
|
||||||
|
connections[conn_key]['bytes'] += payload_size
|
||||||
|
total_tcp_data += payload_size
|
||||||
|
|
||||||
|
# 统计IP流量
|
||||||
|
ip_stats[src_ip]['send_bytes'] += payload_size
|
||||||
|
ip_stats[dst_ip]['recv_bytes'] += payload_size
|
||||||
|
ip_stats[src_ip]['packets'] += 1
|
||||||
|
|
||||||
|
# 检查是否与目标端口相关
|
||||||
|
if dst_port == target_port:
|
||||||
|
connections[conn_key]['to_target'] += payload_size
|
||||||
|
target_tcp_data += payload_size
|
||||||
|
target_key = f"{src_ip}:{src_port}"
|
||||||
|
target_connections[target_key]['packets'] += 1
|
||||||
|
target_connections[target_key]['bytes'] += payload_size
|
||||||
|
elif src_port == target_port:
|
||||||
|
connections[conn_key]['from_target'] += payload_size
|
||||||
|
|
||||||
|
elif UDP in packet:
|
||||||
|
udp_packets += 1
|
||||||
|
else:
|
||||||
|
other_packets += 1
|
||||||
|
else:
|
||||||
|
other_packets += 1
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
|
||||||
|
# 输出统计结果
|
||||||
|
print(f"总包数: {total_packets}")
|
||||||
|
print(f"TCP包数: {tcp_packets}")
|
||||||
|
print(f"UDP包数: {udp_packets}")
|
||||||
|
print(f"其他包数: {other_packets}")
|
||||||
|
print(f"TCP总数据量: {total_tcp_data / (1024*1024):.2f} MB")
|
||||||
|
print(f"目标端口({target_port})数据量: {target_tcp_data / (1024*1024):.2f} MB")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# 显示IP统计
|
||||||
|
print("IP地址统计:")
|
||||||
|
print("-" * 60)
|
||||||
|
for ip, stats in sorted(ip_stats.items(), key=lambda x: x[1]['send_bytes'], reverse=True)[:10]:
|
||||||
|
send_mb = stats['send_bytes'] / (1024*1024)
|
||||||
|
recv_mb = stats['recv_bytes'] / (1024*1024)
|
||||||
|
print(f"{ip:15} 发送: {send_mb:8.2f}MB 接收: {recv_mb:8.2f}MB 包数: {stats['packets']}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# 显示目标端口连接统计
|
||||||
|
print(f"目标端口({target_port})连接统计:")
|
||||||
|
print("-" * 60)
|
||||||
|
for conn, stats in sorted(target_connections.items(), key=lambda x: x[1]['bytes'], reverse=True):
|
||||||
|
mb = stats['bytes'] / (1024*1024)
|
||||||
|
print(f"{conn:25} 包数: {stats['packets']:6} 数据量: {mb:8.2f}MB")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# 显示前10个最大的连接
|
||||||
|
print("前10个最大的连接:")
|
||||||
|
print("-" * 80)
|
||||||
|
for conn, stats in sorted(connections.items(), key=lambda x: x[1]['bytes'], reverse=True)[:10]:
|
||||||
|
mb = stats['bytes'] / (1024*1024)
|
||||||
|
to_target_mb = stats['to_target'] / (1024*1024)
|
||||||
|
from_target_mb = stats['from_target'] / (1024*1024)
|
||||||
|
print(f"{conn:40} 总量: {mb:8.2f}MB 发往目标: {to_target_mb:6.2f}MB 来自目标: {from_target_mb:6.2f}MB")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"分析PCAP文件时出错: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description='PCAP文件分析工具')
|
||||||
|
parser.add_argument('pcap_file', help='pcap文件路径')
|
||||||
|
parser.add_argument('--target-port', type=int, default=1935, help='目标端口 (默认: 1935)')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
analyze_pcap(args.pcap_file, args.target_port)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Reference in New Issue
Block a user