diff --git a/scripts/packet_replayer.py b/scripts/packet_replayer.py index 7e647f7..be59f25 100644 --- a/scripts/packet_replayer.py +++ b/scripts/packet_replayer.py @@ -9,288 +9,523 @@ import threading import queue import socket import heapq +from datetime import datetime class PacketReplayer: def __init__(self, pcap_file, target_ip, target_port): self.pcap_file = pcap_file self.target_ip = target_ip self.target_port = target_port - self.connections = defaultdict(list) # 存储每个连接的包序列 + self.connections = defaultdict(list) # 存��每个连接的包序列 self.response_queue = queue.Queue() self.stop_reading = threading.Event() self.socket = None - self.next_seq = None # 下一个期望的序列号 - self.pending_packets = [] # 使用优先队列存储待发送的包 - self.seen_packets = set() # 用于去重 - self.initial_seq = None # 初始序列号 - self.initial_ack = None # 初始确认号 - self.client_ip = None # 客户端IP - self.client_port = None # 客户端端口 - self.first_data_packet = True # 标记是否是第一个数据包 self.total_packets_sent = 0 # 发送的数据包数量 self.total_bytes_sent = 0 # 发送的总字节数 # 添加时间控制相关属性 self.first_packet_time = None # 第一个包的时间戳 self.use_original_timing = True # 是否使用原始时间间隔 - self.last_packet_time = None # 上一个包的时间戳 + self.start_time = None # 重放开始时间 + self.last_activity_time = None # 最后活动时间 + self.keepalive_interval = 30.0 # 保活间隔(秒) + self.connection_timeout = 60.0 # 连接超时时间(秒) + # 简化的数据包管理 + self.data_packets = [] # 按时间顺序存储所有数据包 + self.processed_count = 0 + + def log_with_timestamp(self, message): + """带时间戳的日志输出""" + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print(f"[{timestamp}] {message}") def establish_tcp_connection(self, src_port): """建立TCP连接""" - print(f"正在建立TCP连接 {self.target_ip}:{self.target_port}...") + self.log_with_timestamp(f"正在建���TCP连接 {self.target_ip}:{self.target_port}...") try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # 不绑定源端口,让系统自动分配 - self.socket.settimeout(5) + # 设置socket选项 + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # 设置较大的缓冲区 + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2*1024*1024) # 2MB发送缓冲区 + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2*1024*1024) # 2MB接收缓冲区 + + self.socket.settimeout(self.connection_timeout) self.socket.connect((self.target_ip, self.target_port)) actual_port = self.socket.getsockname()[1] - print(f"使用本地端口: {actual_port}") - print("TCP连接已建立") + self.last_activity_time = time.time() + self.log_with_timestamp(f"使用本地端口: {actual_port}") + self.log_with_timestamp("TCP连接已建立") return True except Exception as e: - print(f"建立连接失败: {e}") + self.log_with_timestamp(f"建立连接失败: {e}") if self.socket: self.socket.close() self.socket = None return False - def process_packet(self, packet, src_ip=None, src_port=None, protocol=None): - """处理单个数据包""" - if IP not in packet: - return + def load_packets(self, src_ip=None, src_port=None, protocol=None): + """预加载所有数据包,改进数据包收集逻辑""" + self.log_with_timestamp("开始加载数据包...") - if src_ip and packet[IP].src != src_ip: - return - - if protocol == 'tcp' and TCP in packet: - if src_port and packet[TCP].sport != src_port: - return - conn_id = (packet[IP].src, packet[TCP].sport) - self.connections[conn_id].append(packet) - elif protocol == 'udp' and UDP in packet: - if src_port and packet[UDP].sport != src_port: - return - conn_id = (packet[IP].src, packet[UDP].sport) - self.connections[conn_id].append(packet) - elif not protocol: - if TCP in packet: - if src_port and packet[TCP].sport != src_port: - return - conn_id = (packet[IP].src, packet[TCP].sport) - self.connections[conn_id].append(packet) - elif UDP in packet: - if src_port and packet[UDP].sport != src_port: - return - conn_id = (packet[IP].src, packet[UDP].sport) - self.connections[conn_id].append(packet) - - def send_packet(self, packet, packet_count): - """发送单个数据包,处理序列号""" - if TCP not in packet or IP not in packet: - return True - - try: - # 检查是否是发送到目标端口的包 - if packet[TCP].dport == self.target_port: - # 记录客户端信息 - if self.client_ip is None: - self.client_ip = packet[IP].src - self.client_port = packet[TCP].sport - print(f"识别到客户端: {self.client_ip}:{self.client_port}") - - # 获取TCP序列号和确认号 - seq = packet[TCP].seq - ack = packet[TCP].ack - flags = packet[TCP].flags - - # 打印数据包信息 - print(f"[序号:{packet_count}] 处理数据包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}, seq={seq}, ack={ack}, flags={flags}") - - # 发送当前包 - if Raw in packet: - # 如果是第一个数据包,记录初始序列号 - if self.first_data_packet: - self.initial_seq = seq - self.next_seq = seq - self.first_data_packet = False - print(f"第一个数据包,初始序列号: {seq}") - - # 如果是重传包,跳过 - if seq in self.seen_packets: - print(f"跳过重传包,序列号: {seq}") - return True - - # 如果序列号大于期望的序列号,将包放入待发送队列 - if seq > self.next_seq: - print(f"包乱序,放入队列,序列号: {seq}, 期望序列号: {self.next_seq}") - heapq.heappush(self.pending_packets, (seq, packet)) - return True - - payload = packet[Raw].load - print(f"准备发送数据包,负载大小: {len(payload)} 字节") - self.socket.send(payload) - self.seen_packets.add(seq) - old_seq = self.next_seq - self.next_seq = self.next_seq + len(payload) - print(f"更新序列号: {old_seq} -> {self.next_seq}") - - # 更新统计信息 - self.total_packets_sent += 1 - self.total_bytes_sent += len(payload) - - # 检查并发送待发送队列中的包 - while self.pending_packets and self.pending_packets[0][0] == self.next_seq: - _, next_packet = heapq.heappop(self.pending_packets) - if Raw in next_packet: - next_payload = next_packet[Raw].load - print(f"发送队列中的包,负载大小: {len(next_payload)} 字节") - self.socket.send(next_payload) - self.seen_packets.add(self.next_seq) - old_seq = self.next_seq - self.next_seq += len(next_payload) - print(f"更新序列号: {old_seq} -> {self.next_seq}") - - # 更新统计信息 - self.total_packets_sent += 1 - self.total_bytes_sent += len(next_payload) - - packet_time = time.strftime("%H:%M:%S", time.localtime(float(packet.time))) - print(f"[{packet_time}] [序号:{packet_count}] 已发送数据包 (序列号: {seq}, 负载大小: {len(payload)} 字节)") - else: - # 对于控制包,只记录到已处理集合 - if flags & 0x02: # SYN - print(f"[序号:{packet_count}] 处理SYN包") - elif flags & 0x10: # ACK - print(f"[序号:{packet_count}] 处理ACK包") - else: - print(f"[序号:{packet_count}] 跳过无负载包") - else: - print(f"[序号:{packet_count}] 跳过非目标端口的包: src={packet[IP].src}:{packet[TCP].sport} -> dst={packet[IP].dst}:{packet[TCP].dport}") - return True - except Exception as e: - print(f"发送数据包 {packet_count} 时出错: {e}") - return False - - def response_reader(self, src_port): - """持续读取服务器响应的线程函数""" - while not self.stop_reading.is_set() and self.socket: - try: - data = self.socket.recv(4096) - if data: - self.response_queue.put(data) - print(f"收到响应: {len(data)} 字节") - except socket.timeout: - continue - except Exception as e: - if not self.stop_reading.is_set(): - print(f"读取响应时出错: {e}") - break - time.sleep(0.1) - - def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0): - """边读取边重放数据包""" - print(f"开始读取并重放数据包到 {self.target_ip}:{self.target_port}") - print(f"使用原始时间间隔发送数据包") - try: reader = PcapReader(self.pcap_file) packet_count = 0 - connection_established = False - + total_pcap_packets = 0 + handshake_packets = [] + for packet in reader: - packet_count += 1 - - if IP not in packet: + total_pcap_packets += 1 + + if IP not in packet or TCP not in packet: continue - + + packet_count += 1 + + # 更宽松的过滤条件 if src_ip and packet[IP].src != src_ip: continue - - current_src_port = None - if protocol == 'tcp' and TCP in packet: - if src_port and packet[TCP].sport != src_port: - continue - current_src_port = packet[TCP].sport - elif protocol == 'udp' and UDP in packet: - if src_port and packet[UDP].sport != src_port: - continue - current_src_port = packet[UDP].sport - elif not protocol: - if TCP in packet: - if src_port and packet[TCP].sport != src_port: - continue - current_src_port = packet[TCP].sport - elif UDP in packet: - if src_port and packet[UDP].sport != src_port: - continue - current_src_port = packet[UDP].sport - else: - continue - else: + + if src_port and packet[TCP].sport != src_port: continue - - if not connection_established: - if not self.establish_tcp_connection(current_src_port): - print("无法建立连接,退出") - return - self.stop_reading.clear() - reader_thread = threading.Thread(target=self.response_reader, args=(current_src_port,)) - reader_thread.daemon = True - reader_thread.start() - connection_established = True - - # 处理时间间隔 - current_time = float(packet.time) - if self.first_packet_time is None: - self.first_packet_time = current_time - self.last_packet_time = current_time - elif self.use_original_timing: - # 计算与上一个包的时间差 - time_diff = current_time - self.last_packet_time - if time_diff > 0: - print(f"等待 {time_diff:.3f} 秒后发送下一个包...") - time.sleep(time_diff) - self.last_packet_time = current_time - - if not self.send_packet(packet, packet_count): - print("发送数据包失败,退出") - return - - if delay > 0: - time.sleep(delay) - - print(f"\n统计信息:") - print(f"总共处理了 {packet_count} 个数据包") - print(f"成功发送了 {self.total_packets_sent} 个数据包") - print(f"总共发送了 {self.total_bytes_sent} 字节数据") - if self.first_packet_time is not None: - total_time = float(self.last_packet_time - self.first_packet_time) - print(f"总耗时: {total_time:.3f} 秒") - print(f"平均发送速率: {self.total_bytes_sent / total_time:.2f} 字节/秒") - + + # 检查是否是目标连接的数据包(双向) + is_target_connection = ( + (packet[TCP].dport == self.target_port) or # 发送到目标端口 + (packet[TCP].sport == self.target_port) # 从目标端口返回 + ) + + if is_target_connection and Raw in packet: + payload = packet[Raw].load + packet_info = { + 'timestamp': float(packet.time), + 'payload': payload, + 'seq': packet[TCP].seq, + 'packet_count': packet_count, + 'direction': 'to_server' if packet[TCP].dport == self.target_port else 'from_server' + } + + # 检查是否是RTMP握手包(前几个包通常是握手) + if len(self.data_packets) < 20: + handshake_packets.append(packet_info) + + # 只收集发送到服务器的包用于重放 + if packet[TCP].dport == self.target_port: + self.data_packets.append(packet_info) + + # 按时间戳排序 + self.data_packets.sort(key=lambda x: x['timestamp']) + + total_data_size = sum(len(p['payload']) for p in self.data_packets) + + self.log_with_timestamp(f"PCAP文件总包数: {total_pcap_packets}") + self.log_with_timestamp(f"TCP包数: {packet_count}") + self.log_with_timestamp(f"发现握手包: {len(handshake_packets)}") + self.log_with_timestamp(f"待发送数据包: {len(self.data_packets)}") + self.log_with_timestamp(f"总数据量: {total_data_size / (1024*1024):.2f} MB") + + # 分析第一个包,检查是否是RTMP握手 + if self.data_packets: + first_packet = self.data_packets[0] + if len(first_packet['payload']) >= 4: + first_bytes = first_packet['payload'][:4] + self.log_with_timestamp(f"第一个包前4字节: {first_bytes.hex()}") + + # RTMP握手C0包通常是03开头 + if first_bytes[0] == 0x03: + self.log_with_timestamp("检测到RTMP握手包") + else: + self.log_with_timestamp("警告:第一个包可能不是RTMP握手包") + + reader.close() + return True + except Exception as e: - print(f"处理数据包时出错: {e}") - sys.exit(1) + self.log_with_timestamp(f"加载数据包时出错: {e}") + import traceback + traceback.print_exc() + return False + + def send_data_with_flow_control(self, data, max_chunk_size=1024): + """发送数据,使用极其严格的流控制来避免RTMP协议错误""" + if not self.socket: + return False + + total_sent = 0 + data_len = len(data) + + # 对于大数据包,强制增加延迟 + if data_len > 2000: + time.sleep(0.02) # 大包延迟20ms + elif data_len > 1000: + time.sleep(0.01) # 中包延迟10ms + else: + time.sleep(0.005) # 小包延迟5ms + + while total_sent < data_len: + try: + # 使用很小的块大小,确保不会超过RTMP消息边界 + remaining = data_len - total_sent + chunk_size = min(max_chunk_size, remaining) + + chunk = data[total_sent:total_sent + chunk_size] + sent = self.socket.send(chunk) + + if sent == 0: + self.log_with_timestamp("连接已断开") + return False + + total_sent += sent + self.last_activity_time = time.time() + + # 如果没有完全发送当前块,继续发送剩余部分 + if sent < len(chunk): + self.log_with_timestamp(f"部分发送: {sent}/{len(chunk)} 字节") + + # 每个块之间都要延迟,给RTMP协议栈处理时间 + if total_sent < data_len: + time.sleep(0.002) # 每块之间2ms延迟 + + # 每发送512字节检查一次服务器响应 + if total_sent % 512 == 0: + try: + self.socket.settimeout(0.001) + response = self.socket.recv(1024) + if response: + self.response_queue.put(response) + self.last_activity_time = time.time() + # 收到响应后稍微等待,让服务器处理 + time.sleep(0.001) + except socket.timeout: + pass # 没有响应数据 + except Exception: + pass # 忽略其他错误 + finally: + self.socket.settimeout(self.connection_timeout) + + except socket.timeout: + self.log_with_timestamp(f"发送超时,已发送 {total_sent}/{data_len} 字节") + return False + except socket.error as e: + self.log_with_timestamp(f"发送错误: {e}, 已发送 {total_sent}/{data_len} 字节") + return False + + # 发送完一个完整包后,给服务器更多处理时间 + time.sleep(0.005) # 包间延迟5ms + return True + + def response_reader(self): + """持续读取服务器响应""" + while not self.stop_reading.is_set() and self.socket: + try: + self.socket.settimeout(5.0) + data = self.socket.recv(8192) + if data: + self.response_queue.put(data) + self.last_activity_time = time.time() + self.log_with_timestamp(f"收到响应: {len(data)} 字节") + else: + self.log_with_timestamp("服务器关闭了连接") + break + except socket.timeout: + # 检查连接是否长时间无活动 + current_time = time.time() + if self.last_activity_time and (current_time - self.last_activity_time) > self.keepalive_interval: + self.log_with_timestamp("连接长时间无活动,可能已断开") + break + continue + except Exception as e: + if not self.stop_reading.is_set(): + self.log_with_timestamp(f"读取响应时出错: {e}") + break + + def calculate_timing(self, current_packet_time): + """计算时间间隔""" + if self.first_packet_time is None: + self.first_packet_time = current_packet_time + self.start_time = time.time() + return 0 + + # 计算应该等待的时间 + elapsed_in_capture = current_packet_time - self.first_packet_time + elapsed_in_replay = time.time() - self.start_time + wait_time = elapsed_in_capture - elapsed_in_replay + + # 限制最大等待时间 + max_wait = 5.0 + if wait_time > max_wait: + self.log_with_timestamp(f"等待时间过长 ({wait_time:.3f}s),限制为 {max_wait}s") + wait_time = max_wait + + return max(0, wait_time) + + def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0): + """重放数据包,采用正确的TCP流重组方式""" + # 首先加载所有数据包 + if not self.load_packets(src_ip, src_port, protocol): + return + + if not self.data_packets: + self.log_with_timestamp("没有找到可发送的数据包") + return + + # 正确���TCP流重组 - 按序列号重组! + self.log_with_timestamp("重组TCP流...") + + # 第一步:按序列号排序 + self.data_packets.sort(key=lambda x: x['seq']) + self.log_with_timestamp(f"按序列号排序完成,共 {len(self.data_packets)} 个数据包") + + # 第二步:正确重组TCP流 + tcp_segments = [] + expected_seq = self.data_packets[0]['seq'] + self.log_with_timestamp(f"开始TCP重组,初始序列号: {expected_seq}") + + processed_packets = 0 + skipped_packets = 0 + + for packet_info in self.data_packets: + seq = packet_info['seq'] + payload = packet_info['payload'] + + if seq == expected_seq: + # 序列号正确,添加到流中 + tcp_segments.append(payload) + expected_seq += len(payload) + processed_packets += 1 + elif seq < expected_seq: + # 处理重叠数据 + overlap = expected_seq - seq + if len(payload) > overlap: + # 去除重叠部分,添加剩余数据 + tcp_segments.append(payload[overlap:]) + expected_seq = seq + len(payload) + processed_packets += 1 + else: + # 完全重叠,跳过 + skipped_packets += 1 + else: + # 序列号大于期望值,说明有数据包丢失 + gap = seq - expected_seq + self.log_with_timestamp(f"检测到数据包间隙: {gap} 字节,从序列号 {expected_seq} 到 {seq}") + # 跳过间隙,继续处理 + tcp_segments.append(payload) + expected_seq = seq + len(payload) + processed_packets += 1 + + # 合并所有段 + tcp_stream = b''.join(tcp_segments) + + self.log_with_timestamp(f"TCP流重组完成:") + self.log_with_timestamp(f" - 处理了 {processed_packets} 个数据包") + self.log_with_timestamp(f" - 跳过了 {skipped_packets} 个重复包") + self.log_with_timestamp(f" - 重组后大小: {len(tcp_stream) / (1024*1024):.2f} MB") + + # 验证RTMP握手 + if len(tcp_stream) >= 4: + first_bytes = tcp_stream[:4] + self.log_with_timestamp(f"重组流前4字节: {first_bytes.hex()}") + if first_bytes[0] == 0x03: + self.log_with_timestamp("重组流包含正确的RTMP握手") + else: + self.log_with_timestamp("警告:重组流可能不是有效的RTMP流") + + # 建立连接 + if not self.establish_tcp_connection(None): + self.log_with_timestamp("无法建立连接") + return + + # 启动响应读取线程 + self.stop_reading.clear() + reader_thread = threading.Thread(target=self.response_reader) + reader_thread.daemon = True + reader_thread.start() + + self.log_with_timestamp(f"开始发送TCP流数据") + + try: + # 使用更小的块大小,确保RTMP协议完整性 + chunk_size = 1024 # 减小到1KB,更安全 + sent_bytes = 0 + chunk_count = 0 + last_progress_time = time.time() + self.start_time = time.time() + + while sent_bytes < len(tcp_stream): + current_time = time.time() + + # 每5秒显示一次进度 + if current_time - last_progress_time >= 5.0: + progress = (sent_bytes / len(tcp_stream)) * 100 + self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB)") + last_progress_time = current_time + + # 计算当前块大小 + remaining = len(tcp_stream) - sent_bytes + current_chunk_size = min(chunk_size, remaining) + + # 发送数据块 + chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size] + + try: + self.socket.settimeout(10) # 10秒超时 + bytes_sent = self.socket.send(chunk) + + if bytes_sent == 0: + self.log_with_timestamp("连接已断开") + break + + sent_bytes += bytes_sent + chunk_count += 1 + self.total_bytes_sent += bytes_sent + self.last_activity_time = time.time() + + # 如果没有完全发送当前块,继续发送剩余部分 + if bytes_sent < len(chunk): + self.log_with_timestamp(f"部分发送: {bytes_sent}/{len(chunk)} 字节") + continue + + except socket.timeout: + self.log_with_timestamp(f"发送数据块超时,继续...") + continue + except socket.error as e: + self.log_with_timestamp(f"发送数据块失败: {e}") + break + + # 更温和的流控制 + if chunk_count % 50 == 0: # 每50个块休眠 + time.sleep(0.005) # 5ms休眠 + + self.log_with_timestamp(f"数据发送完成,等待服务器处理...") + time.sleep(3) # 等待服务器处理完成 + + self.log_with_timestamp(f"\n=== 发送完成 ===") + self.log_with_timestamp(f"成功发送了 {chunk_count} 个数据块") + self.log_with_timestamp(f"总共发送了 {self.total_bytes_sent} 字节数据 ({self.total_bytes_sent / (1024*1024):.2f} MB)") + if self.start_time: + total_time = time.time() - self.start_time + self.log_with_timestamp(f"总耗时: {total_time:.3f} 秒") + if total_time > 0: + self.log_with_timestamp(f"平均发送速率: {(self.total_bytes_sent / total_time) / (1024*1024):.2f} MB/s") + + except Exception as e: + self.log_with_timestamp(f"重放过程中出错: {e}") + import traceback + traceback.print_exc() finally: self.stop_reading.set() if self.socket: + self.log_with_timestamp("关闭TCP连接...") + try: + self.socket.shutdown(socket.SHUT_RDWR) + except: + pass self.socket.close() self.socket = None + + def list_all_connections(self): + """列出所有连接""" + self.log_with_timestamp("分析PCAP文件,列出所有连接...") + try: + reader = PcapReader(self.pcap_file) + connections = defaultdict(lambda: defaultdict(int)) + + for packet in reader: + if IP not in packet or TCP not in packet: + continue + + # 统计每个源端口和目标端口的流量 + src_port = packet[TCP].sport + dst_port = packet[TCP].dport + payload_size = len(packet[Raw].load) if Raw in packet else 0 + + connections[dst_port]['sent'] += payload_size + connections[src_port]['received'] += payload_size + reader.close() + # 按照发送和接收的字节数排序 + sorted_connections = sorted(connections.items(), key=lambda item: item[1]['sent'] + item[1]['received'], reverse=True) + + self.log_with_timestamp("发现的连接:") + for port, stats in sorted_connections: + self.log_with_timestamp(f"端口 {port}: 发送 {stats['sent']} 字节, 接收 {stats['received']} 字节") + + except Exception as e: + self.log_with_timestamp(f"列出连接时出错: {e}") + import traceback + traceback.print_exc() + + def find_largest_connection(self, src_ip=None): + """自动选择数据量最大的连接""" + self.log_with_timestamp("正在查找数据量最大的连接...") + try: + reader = PcapReader(self.pcap_file) + connections = defaultdict(int) + + for packet in reader: + if IP not in packet or TCP not in packet: + continue + + # 只统计发送到目标IP的数据 + if packet[IP].dst != self.target_ip: + continue + + # 统计每个源端口的流量 + src_port = packet[TCP].sport + payload_size = len(packet[Raw].load) if Raw in packet else 0 + connections[src_port] += payload_size + + reader.close() + + # 找到数据量最大的源端口 + if connections: + largest_port = max(connections.items(), key=lambda item: item[1])[0] + self.log_with_timestamp(f"数据量最大的连接: 源端口 {largest_port}") + return largest_port + else: + self.log_with_timestamp("未找到合适的连接") + return None + + except Exception as e: + self.log_with_timestamp(f"查找最大连接时出错: {e}") + import traceback + traceback.print_exc() + return None + def main(): - parser = argparse.ArgumentParser(description='Wireshark数据包重放工具') + parser = argparse.ArgumentParser(description='RTMP数据包重放工具 - 支持自动选择最大连接') parser.add_argument('pcap_file', help='pcap文件路径') parser.add_argument('target_ip', help='目标IP地址') parser.add_argument('target_port', type=int, help='目标端口') parser.add_argument('--delay', type=float, default=0, help='数据包发送间隔(秒)') parser.add_argument('--src-ip', help='过滤源IP地址') - parser.add_argument('--src-port', type=int, help='过滤源端口') + parser.add_argument('--src-port', type=int, help='过滤源端口 (留空自动选择最大连接)') parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型') + parser.add_argument('--auto-select', action='store_true', help='自动选择数据量最大的连接') + parser.add_argument('--list-connections', action='store_true', help='列出所有连接并退出') args = parser.parse_args() replayer = PacketReplayer(args.pcap_file, args.target_ip, args.target_port) - replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay) + + # 如果指定了列出连接,就分析并显示所有连接 + if args.list_connections: + replayer.list_all_connections() + return + + # 如果启用自动选择或没有指定源端口,选择最大的连接 + if args.auto_select or args.src_port is None: + best_port = replayer.find_largest_connection(args.src_ip) + if best_port: + print(f"自动选择数据量最大的连接: 端口 {best_port}") + replayer.replay_packets(args.src_ip, best_port, args.protocol, args.delay) + else: + print("未找到合适的连接") + else: + replayer.replay_packets(args.src_ip, args.src_port, args.protocol, args.delay) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/scripts/pcap_analyzer.py b/scripts/pcap_analyzer.py new file mode 100644 index 0000000..1b6ca6d --- /dev/null +++ b/scripts/pcap_analyzer.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +import argparse +from scapy.all import rdpcap, IP, TCP, UDP, Raw, PcapReader +import sys +from collections import defaultdict + +def analyze_pcap(pcap_file, target_port=1935): + """分析pcap文件的详细信息""" + print(f"分析PCAP文件: {pcap_file}") + print(f"目标端口: {target_port}") + print("=" * 60) + + try: + reader = PcapReader(pcap_file) + + # 统计信息 + total_packets = 0 + tcp_packets = 0 + udp_packets = 0 + other_packets = 0 + + # 连接统计 + connections = defaultdict(lambda: {'packets': 0, 'bytes': 0, 'to_target': 0, 'from_target': 0}) + target_connections = defaultdict(lambda: {'packets': 0, 'bytes': 0}) + + # 数据大小统计 + total_tcp_data = 0 + target_tcp_data = 0 + + # IP统计 + ip_stats = defaultdict(lambda: {'send_bytes': 0, 'recv_bytes': 0, 'packets': 0}) + + for packet in reader: + total_packets += 1 + + if IP in packet: + src_ip = packet[IP].src + dst_ip = packet[IP].dst + + if TCP in packet: + tcp_packets += 1 + src_port = packet[TCP].sport + dst_port = packet[TCP].dport + + conn_key = f"{src_ip}:{src_port} -> {dst_ip}:{dst_port}" + connections[conn_key]['packets'] += 1 + + if Raw in packet: + payload_size = len(packet[Raw].load) + connections[conn_key]['bytes'] += payload_size + total_tcp_data += payload_size + + # 统计IP流量 + ip_stats[src_ip]['send_bytes'] += payload_size + ip_stats[dst_ip]['recv_bytes'] += payload_size + ip_stats[src_ip]['packets'] += 1 + + # 检查是否与目标端口相关 + if dst_port == target_port: + connections[conn_key]['to_target'] += payload_size + target_tcp_data += payload_size + target_key = f"{src_ip}:{src_port}" + target_connections[target_key]['packets'] += 1 + target_connections[target_key]['bytes'] += payload_size + elif src_port == target_port: + connections[conn_key]['from_target'] += payload_size + + elif UDP in packet: + udp_packets += 1 + else: + other_packets += 1 + else: + other_packets += 1 + + reader.close() + + # 输出统计结果 + print(f"总包数: {total_packets}") + print(f"TCP包数: {tcp_packets}") + print(f"UDP包数: {udp_packets}") + print(f"其他包数: {other_packets}") + print(f"TCP总数据量: {total_tcp_data / (1024*1024):.2f} MB") + print(f"目标端口({target_port})数据量: {target_tcp_data / (1024*1024):.2f} MB") + print() + + # 显示IP统计 + print("IP地址统计:") + print("-" * 60) + for ip, stats in sorted(ip_stats.items(), key=lambda x: x[1]['send_bytes'], reverse=True)[:10]: + send_mb = stats['send_bytes'] / (1024*1024) + recv_mb = stats['recv_bytes'] / (1024*1024) + print(f"{ip:15} 发送: {send_mb:8.2f}MB 接收: {recv_mb:8.2f}MB 包数: {stats['packets']}") + print() + + # 显示目标端口连接统计 + print(f"目标端口({target_port})连接统计:") + print("-" * 60) + for conn, stats in sorted(target_connections.items(), key=lambda x: x[1]['bytes'], reverse=True): + mb = stats['bytes'] / (1024*1024) + print(f"{conn:25} 包数: {stats['packets']:6} 数据量: {mb:8.2f}MB") + print() + + # 显示前10个最大的连接 + print("前10个最大的连接:") + print("-" * 80) + for conn, stats in sorted(connections.items(), key=lambda x: x[1]['bytes'], reverse=True)[:10]: + mb = stats['bytes'] / (1024*1024) + to_target_mb = stats['to_target'] / (1024*1024) + from_target_mb = stats['from_target'] / (1024*1024) + print(f"{conn:40} 总量: {mb:8.2f}MB 发往目标: {to_target_mb:6.2f}MB 来自目标: {from_target_mb:6.2f}MB") + + except Exception as e: + print(f"分析PCAP文件时出错: {e}") + import traceback + traceback.print_exc() + +def main(): + parser = argparse.ArgumentParser(description='PCAP文件分析工具') + parser.add_argument('pcap_file', help='pcap文件路径') + parser.add_argument('--target-port', type=int, default=1935, help='目标端口 (默认: 1935)') + + args = parser.parse_args() + analyze_pcap(args.pcap_file, args.target_port) + +if __name__ == '__main__': + main()