diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index ae0f2ec..4dcfc99 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -296,18 +296,18 @@ func (r *Recorder) Run() (err error) { if vt == nil { vt = sub.VideoReader.Track - switch vt.ICodecCtx.GetBase().(type) { + switch video.ICodecCtx.GetBase().(type) { case *codec.H264Ctx: track := r.muxer.AddTrack(box.MP4_CODEC_H264) videoTrack = track - track.ICodecCtx = vt.ICodecCtx + track.ICodecCtx = video.ICodecCtx case *codec.H265Ctx: track := r.muxer.AddTrack(box.MP4_CODEC_H265) videoTrack = track - track.ICodecCtx = vt.ICodecCtx + track.ICodecCtx = video.ICodecCtx } } - ctx := vt.ICodecCtx.(pkg.IVideoCodecCtx) + ctx := video.ICodecCtx.(pkg.IVideoCodecCtx) if videoTrackCtx, ok := videoTrack.ICodecCtx.(pkg.IVideoCodecCtx); ok && videoTrackCtx != ctx { width, height := uint32(ctx.Width()), uint32(ctx.Height()) oldWidth, oldHeight := uint32(videoTrackCtx.Width()), uint32(videoTrackCtx.Height()) @@ -322,6 +322,17 @@ func (r *Recorder) Run() (err error) { at, vt = nil, nil if vr := sub.VideoReader; vr != nil { vr.ResetAbsTime() + vt = vr.Track + switch video.ICodecCtx.GetBase().(type) { + case *codec.H264Ctx: + track := r.muxer.AddTrack(box.MP4_CODEC_H264) + videoTrack = track + track.ICodecCtx = video.ICodecCtx + case *codec.H265Ctx: + track := r.muxer.AddTrack(box.MP4_CODEC_H265) + videoTrack = track + track.ICodecCtx = video.ICodecCtx + } } if ar := sub.AudioReader; ar != nil { ar.ResetAbsTime() diff --git a/plugin/mp4/pkg/track.go b/plugin/mp4/pkg/track.go index 45f90a8..57a05d9 100644 --- a/plugin/mp4/pkg/track.go +++ b/plugin/mp4/pkg/track.go @@ -73,9 +73,14 @@ func (track *Track) makeElstBox() *EditListBox { // elst.entrys.entrys[0].mediaRateFraction = 0 // } - //简单起见,mediaTime先固定为0,即不延迟播放 + //使用第一个sample的时间戳作为MediaTime,确保时间轴对齐 + firstTimestamp := track.Samplelist[0].Timestamp + firstCTS := track.Samplelist[0].CTS + mediaTime := int64(firstTimestamp) + int64(firstCTS) + entrys[entryCount-1].SegmentDuration = uint64(track.Duration) - entrys[entryCount-1].MediaTime = 0 + // MediaTime应该是第一个sample的PTS (DTS + CTS) + entrys[entryCount-1].MediaTime = mediaTime entrys[entryCount-1].MediaRateInteger = 0x0001 entrys[entryCount-1].MediaRateFraction = 0 diff --git a/scripts/packet_replayer.py b/scripts/packet_replayer.py index be59f25..9906a5d 100644 --- a/scripts/packet_replayer.py +++ b/scripts/packet_replayer.py @@ -261,6 +261,200 @@ class PacketReplayer: return max(0, wait_time) + def replay_all_connections(self, src_ip=None, protocol=None, delay=0): + """重放所有连接,检测到新连接时自动重连""" + self.log_with_timestamp("开始加载所有连接的数据包...") + + try: + reader = PcapReader(self.pcap_file) + # 按源端口分组所有数据包 + connections = defaultdict(list) + + for packet in reader: + if IP not in packet or TCP not in packet: + continue + + # 只收集发送到目标端口的包 + if packet[TCP].dport == self.target_port and Raw in packet: + src_port = packet[TCP].sport + packet_info = { + 'timestamp': float(packet.time), + 'payload': packet[Raw].load, + 'seq': packet[TCP].seq, + 'src_port': src_port + } + connections[src_port].append(packet_info) + + reader.close() + + if not connections: + self.log_with_timestamp("没有找到任何连接") + return + + self.log_with_timestamp(f"发现 {len(connections)} 个连接") + for port, packets in sorted(connections.items()): + total_size = sum(len(p['payload']) for p in packets) + self.log_with_timestamp(f" 端口 {port}: {len(packets)} 个包, {total_size / (1024*1024):.2f} MB") + + # 按时间顺序处理所有连接 + self.log_with_timestamp("\n开始按时间顺序重放所有连接...") + + # 将所有连接的包合并并按时间排序 + all_packets = [] + for src_port, packets in connections.items(): + for pkt in packets: + all_packets.append(pkt) + + all_packets.sort(key=lambda x: x['timestamp']) + self.log_with_timestamp(f"总共 {len(all_packets)} 个数据包") + + # 按连接分组重放 + current_connection = None + connection_packets = [] + + for pkt in all_packets: + src_port = pkt['src_port'] + + # 检测到新连接 + if current_connection != src_port: + # 先发送之前连接的数据 + if connection_packets: + self.log_with_timestamp(f"\n[连接 {current_connection}] 发送 {len(connection_packets)} 个包") + self._send_connection_packets(connection_packets) + + # 开始新连接 + current_connection = src_port + connection_packets = [] + self.log_with_timestamp(f"\n[新连接] 端口 {src_port}") + + connection_packets.append(pkt) + + # 发送最后一个连接的数据 + if connection_packets: + self.log_with_timestamp(f"\n[连接 {current_connection}] 发送 {len(connection_packets)} 个包") + self._send_connection_packets(connection_packets) + + self.log_with_timestamp("\n所有连接重放完成") + + except Exception as e: + self.log_with_timestamp(f"重放所有连接时出错: {e}") + import traceback + traceback.print_exc() + + def _send_connection_packets(self, packets): + """发送单个连接的所有数据包""" + # 按序列号重组TCP流 + packets.sort(key=lambda x: x['seq']) + + tcp_segments = [] + chunk_sizes = [] + chunk_timestamps = [] + expected_seq = packets[0]['seq'] + + for pkt in packets: + seq = pkt['seq'] + payload = pkt['payload'] + timestamp = pkt['timestamp'] + + if seq == expected_seq: + tcp_segments.append(payload) + chunk_sizes.append(len(payload)) + chunk_timestamps.append(timestamp) + expected_seq += len(payload) + elif seq < expected_seq: + overlap = expected_seq - seq + if len(payload) > overlap: + tcp_segments.append(payload[overlap:]) + chunk_sizes.append(len(payload[overlap:])) + chunk_timestamps.append(timestamp) + expected_seq = seq + len(payload) + else: + tcp_segments.append(payload) + chunk_sizes.append(len(payload)) + chunk_timestamps.append(timestamp) + expected_seq = seq + len(payload) + + tcp_stream = b''.join(tcp_segments) + + if not tcp_stream: + self.log_with_timestamp(" 警告:没有数据可发送") + return + + self.log_with_timestamp(f" 重组完成: {len(tcp_stream) / (1024*1024):.2f} MB, {len(chunk_sizes)} 个包") + + # 建立新连接 + if not self.establish_tcp_connection(None): + self.log_with_timestamp(" 无法建立连接") + return + + # 启动响应读取线程 + self.stop_reading.clear() + reader_thread = threading.Thread(target=self.response_reader) + reader_thread.daemon = True + reader_thread.start() + + # 发送数据 + try: + sent_bytes = 0 + chunk_index = 0 + replay_start_time = time.time() + first_packet_time = chunk_timestamps[0] if chunk_timestamps else 0 + + while sent_bytes < len(tcp_stream) and chunk_index < len(chunk_sizes): + # 计算等待时间 + if chunk_index < len(chunk_timestamps): + target_time = chunk_timestamps[chunk_index] - first_packet_time + elapsed_time = time.time() - replay_start_time + wait_time = target_time - elapsed_time + + if wait_time > 0: + if wait_time > 5.0: + wait_time = 5.0 + time.sleep(wait_time) + + current_chunk_size = chunk_sizes[chunk_index] + remaining = len(tcp_stream) - sent_bytes + current_chunk_size = min(current_chunk_size, remaining) + + chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size] + chunk_sent = 0 + + while chunk_sent < len(chunk): + try: + self.socket.settimeout(10) + bytes_sent = self.socket.send(chunk[chunk_sent:]) + + if bytes_sent == 0: + break + + chunk_sent += bytes_sent + sent_bytes += bytes_sent + self.total_bytes_sent += bytes_sent + self.last_activity_time = time.time() + + except socket.timeout: + continue + except socket.error: + break + + if chunk_sent == len(chunk): + chunk_index += 1 + else: + break + + self.log_with_timestamp(f" 发送完成: {sent_bytes / (1024*1024):.2f} MB") + time.sleep(1) # 短暂等待 + + finally: + self.stop_reading.set() + if self.socket: + try: + self.socket.shutdown(socket.SHUT_RDWR) + except: + pass + self.socket.close() + self.socket = None + def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0): """重放数据包,采用正确的TCP流重组方式""" # 首先加载所有数据包 @@ -271,15 +465,17 @@ class PacketReplayer: self.log_with_timestamp("没有找到可发送的数据包") return - # 正确���TCP流重组 - 按序列号重组! + # 正确的TCP流重组 - 按序列号重组! self.log_with_timestamp("重组TCP流...") # 第一步:按序列号排序 self.data_packets.sort(key=lambda x: x['seq']) self.log_with_timestamp(f"按序列号排序完成,共 {len(self.data_packets)} 个数据包") - # 第二步:正确重组TCP流 + # 第二步:正确重组TCP流,同时保留每个包的大小和时间戳信息 tcp_segments = [] + chunk_sizes = [] # 保存每个原始包的大小 + chunk_timestamps = [] # 保存每个原始包的时间戳 expected_seq = self.data_packets[0]['seq'] self.log_with_timestamp(f"开始TCP重组,初始序列号: {expected_seq}") @@ -289,10 +485,13 @@ class PacketReplayer: for packet_info in self.data_packets: seq = packet_info['seq'] payload = packet_info['payload'] + timestamp = packet_info['timestamp'] if seq == expected_seq: # 序列号正确,添加到流中 tcp_segments.append(payload) + chunk_sizes.append(len(payload)) # 记录原始包大小 + chunk_timestamps.append(timestamp) # 记录时间戳 expected_seq += len(payload) processed_packets += 1 elif seq < expected_seq: @@ -301,6 +500,8 @@ class PacketReplayer: if len(payload) > overlap: # 去除重叠部分,添加剩余数据 tcp_segments.append(payload[overlap:]) + chunk_sizes.append(len(payload[overlap:])) # 记录实际添加的大小 + chunk_timestamps.append(timestamp) # 记录时间戳 expected_seq = seq + len(payload) processed_packets += 1 else: @@ -312,6 +513,8 @@ class PacketReplayer: self.log_with_timestamp(f"检测到数据包间隙: {gap} 字节,从序列号 {expected_seq} 到 {seq}") # 跳过间隙,继续处理 tcp_segments.append(payload) + chunk_sizes.append(len(payload)) # 记录原始包大小 + chunk_timestamps.append(timestamp) # 记录时间戳 expected_seq = seq + len(payload) processed_packets += 1 @@ -343,63 +546,97 @@ class PacketReplayer: reader_thread.daemon = True reader_thread.start() - self.log_with_timestamp(f"开始发送TCP流数据") + self.log_with_timestamp(f"开始发送TCP流数据(使用动态chunk大小和原始时间间隔)") + self.log_with_timestamp(f"原始数据包数量: {len(chunk_sizes)}") + if chunk_sizes: + avg_size = sum(chunk_sizes) / len(chunk_sizes) + min_size = min(chunk_sizes) + max_size = max(chunk_sizes) + self.log_with_timestamp(f"包大小统计 - 平均: {avg_size:.0f}, 最小: {min_size}, 最大: {max_size}") + if chunk_timestamps: + duration = chunk_timestamps[-1] - chunk_timestamps[0] + self.log_with_timestamp(f"抓包时长: {duration:.2f} 秒") try: - # 使用更小的块大小,确保RTMP协议完整性 - chunk_size = 1024 # 减小到1KB,更安全 sent_bytes = 0 chunk_count = 0 last_progress_time = time.time() self.start_time = time.time() + replay_start_time = time.time() # 重放开始时间 + first_packet_time = chunk_timestamps[0] if chunk_timestamps else 0 # 第一个包的时间戳 + chunk_index = 0 - while sent_bytes < len(tcp_stream): + while sent_bytes < len(tcp_stream) and chunk_index < len(chunk_sizes): current_time = time.time() # 每5秒显示一次进度 if current_time - last_progress_time >= 5.0: progress = (sent_bytes / len(tcp_stream)) * 100 - self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB)") + elapsed = current_time - replay_start_time + self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB) 已耗时: {elapsed:.1f}秒") last_progress_time = current_time - # 计算当前块大小 + # 计算应该等待的时间(按照原始时间间隔) + if chunk_index < len(chunk_timestamps): + target_time = chunk_timestamps[chunk_index] - first_packet_time + elapsed_time = time.time() - replay_start_time + wait_time = target_time - elapsed_time + + if wait_time > 0: + # 限制最大等待时间,避免异常 + if wait_time > 5.0: + self.log_with_timestamp(f"警告:等待时间过长 {wait_time:.3f}秒,限制为5秒") + wait_time = 5.0 + time.sleep(wait_time) + + # 使用原始数据包的大小作为chunk_size + current_chunk_size = chunk_sizes[chunk_index] remaining = len(tcp_stream) - sent_bytes - current_chunk_size = min(chunk_size, remaining) + current_chunk_size = min(current_chunk_size, remaining) - # 发送数据块 + # 发送数据块 - 确保完全发送 chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size] + chunk_sent = 0 - try: - self.socket.settimeout(10) # 10秒超时 - bytes_sent = self.socket.send(chunk) + # 循环直到当前chunk完全发送 + while chunk_sent < len(chunk): + try: + self.socket.settimeout(10) # 10秒超时 + bytes_sent = self.socket.send(chunk[chunk_sent:]) - if bytes_sent == 0: - self.log_with_timestamp("连接已断开") + if bytes_sent == 0: + self.log_with_timestamp("连接已断开") + break + + chunk_sent += bytes_sent + sent_bytes += bytes_sent + self.total_bytes_sent += bytes_sent + self.last_activity_time = time.time() + + # 如果部分发送,记录日志 + if chunk_sent < len(chunk): + self.log_with_timestamp(f"部分发送: {chunk_sent}/{len(chunk)} 字节,继续发送剩余部分...") + + except socket.timeout: + self.log_with_timestamp(f"发送数据块超时,重试...") + continue + except socket.error as e: + self.log_with_timestamp(f"发送数据块失败: {e}") break - sent_bytes += bytes_sent + # 检查是否完全发送 + if chunk_sent == len(chunk): chunk_count += 1 - self.total_bytes_sent += bytes_sent - self.last_activity_time = time.time() - - # 如果没有完全发送当前块,继续发送剩余部分 - if bytes_sent < len(chunk): - self.log_with_timestamp(f"部分发送: {bytes_sent}/{len(chunk)} 字节") - continue - - except socket.timeout: - self.log_with_timestamp(f"发送数据块超时,继续...") - continue - except socket.error as e: - self.log_with_timestamp(f"发送数据块失败: {e}") + chunk_index += 1 + else: + # 未完全发送,停止 + self.log_with_timestamp(f"无法完全发送数据块,已发送 {chunk_sent}/{len(chunk)} 字节") break - # 更温和的流控制 - if chunk_count % 50 == 0: # 每50个块休眠 - time.sleep(0.005) # 5ms休眠 + # 移除固定的流控制延迟,让TCP自己控制 self.log_with_timestamp(f"数据发送完成,等待服务器处理...") - time.sleep(3) # 等待服务器处理完成 + time.sleep(10) # 增加等待时间到10秒 self.log_with_timestamp(f"\n=== 发送完成 ===") self.log_with_timestamp(f"成功发送了 {chunk_count} 个数据块") @@ -506,6 +743,7 @@ def main(): parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型') parser.add_argument('--auto-select', action='store_true', help='自动选择数据量最大的连接') parser.add_argument('--list-connections', action='store_true', help='列出所有连接并退出') + parser.add_argument('--all', action='store_true', help='推送所有连接的数据,不区分源端口') args = parser.parse_args() @@ -516,8 +754,12 @@ def main(): replayer.list_all_connections() return + # 如果指定了--all,推送所有连接(自动重连) + if args.all: + print("推送整个pcap文件的所有连接(检测到新连接时自动重连)") + replayer.replay_all_connections(args.src_ip, args.protocol, args.delay) # 如果启用自动选择或没有指定源端口,选择最大的连接 - if args.auto_select or args.src_port is None: + elif args.auto_select or args.src_port is None: best_port = replayer.find_largest_connection(args.src_ip) if best_port: print(f"自动选择数据量最大的连接: 端口 {best_port}")