fix: when rtmp sequence header change,mp4 record two diffent sequence header into the same mp4 file.

This commit is contained in:
pggiroro
2025-10-20 16:29:33 +08:00
parent 6693676fe2
commit d2bd4b2c7a
3 changed files with 298 additions and 40 deletions

View File

@@ -296,18 +296,18 @@ func (r *Recorder) Run() (err error) {
if vt == nil {
vt = sub.VideoReader.Track
switch vt.ICodecCtx.GetBase().(type) {
switch video.ICodecCtx.GetBase().(type) {
case *codec.H264Ctx:
track := r.muxer.AddTrack(box.MP4_CODEC_H264)
videoTrack = track
track.ICodecCtx = vt.ICodecCtx
track.ICodecCtx = video.ICodecCtx
case *codec.H265Ctx:
track := r.muxer.AddTrack(box.MP4_CODEC_H265)
videoTrack = track
track.ICodecCtx = vt.ICodecCtx
track.ICodecCtx = video.ICodecCtx
}
}
ctx := vt.ICodecCtx.(pkg.IVideoCodecCtx)
ctx := video.ICodecCtx.(pkg.IVideoCodecCtx)
if videoTrackCtx, ok := videoTrack.ICodecCtx.(pkg.IVideoCodecCtx); ok && videoTrackCtx != ctx {
width, height := uint32(ctx.Width()), uint32(ctx.Height())
oldWidth, oldHeight := uint32(videoTrackCtx.Width()), uint32(videoTrackCtx.Height())
@@ -322,6 +322,17 @@ func (r *Recorder) Run() (err error) {
at, vt = nil, nil
if vr := sub.VideoReader; vr != nil {
vr.ResetAbsTime()
vt = vr.Track
switch video.ICodecCtx.GetBase().(type) {
case *codec.H264Ctx:
track := r.muxer.AddTrack(box.MP4_CODEC_H264)
videoTrack = track
track.ICodecCtx = video.ICodecCtx
case *codec.H265Ctx:
track := r.muxer.AddTrack(box.MP4_CODEC_H265)
videoTrack = track
track.ICodecCtx = video.ICodecCtx
}
}
if ar := sub.AudioReader; ar != nil {
ar.ResetAbsTime()

View File

@@ -73,9 +73,14 @@ func (track *Track) makeElstBox() *EditListBox {
// elst.entrys.entrys[0].mediaRateFraction = 0
// }
//简单起见mediaTime先固定为0,即不延迟播放
//使用第一个sample的时间戳作为MediaTime确保时间轴对齐
firstTimestamp := track.Samplelist[0].Timestamp
firstCTS := track.Samplelist[0].CTS
mediaTime := int64(firstTimestamp) + int64(firstCTS)
entrys[entryCount-1].SegmentDuration = uint64(track.Duration)
entrys[entryCount-1].MediaTime = 0
// MediaTime应该是第一个sample的PTS (DTS + CTS)
entrys[entryCount-1].MediaTime = mediaTime
entrys[entryCount-1].MediaRateInteger = 0x0001
entrys[entryCount-1].MediaRateFraction = 0

View File

@@ -261,6 +261,200 @@ class PacketReplayer:
return max(0, wait_time)
def replay_all_connections(self, src_ip=None, protocol=None, delay=0):
"""重放所有连接,检测到新连接时自动重连"""
self.log_with_timestamp("开始加载所有连接的数据包...")
try:
reader = PcapReader(self.pcap_file)
# 按源端口分组所有数据包
connections = defaultdict(list)
for packet in reader:
if IP not in packet or TCP not in packet:
continue
# 只收集发送到目标端口的包
if packet[TCP].dport == self.target_port and Raw in packet:
src_port = packet[TCP].sport
packet_info = {
'timestamp': float(packet.time),
'payload': packet[Raw].load,
'seq': packet[TCP].seq,
'src_port': src_port
}
connections[src_port].append(packet_info)
reader.close()
if not connections:
self.log_with_timestamp("没有找到任何连接")
return
self.log_with_timestamp(f"发现 {len(connections)} 个连接")
for port, packets in sorted(connections.items()):
total_size = sum(len(p['payload']) for p in packets)
self.log_with_timestamp(f" 端口 {port}: {len(packets)} 个包, {total_size / (1024*1024):.2f} MB")
# 按时间顺序处理所有连接
self.log_with_timestamp("\n开始按时间顺序重放所有连接...")
# 将所有连接的包合并并按时间排序
all_packets = []
for src_port, packets in connections.items():
for pkt in packets:
all_packets.append(pkt)
all_packets.sort(key=lambda x: x['timestamp'])
self.log_with_timestamp(f"总共 {len(all_packets)} 个数据包")
# 按连接分组重放
current_connection = None
connection_packets = []
for pkt in all_packets:
src_port = pkt['src_port']
# 检测到新连接
if current_connection != src_port:
# 先发送之前连接的数据
if connection_packets:
self.log_with_timestamp(f"\n[连接 {current_connection}] 发送 {len(connection_packets)} 个包")
self._send_connection_packets(connection_packets)
# 开始新连接
current_connection = src_port
connection_packets = []
self.log_with_timestamp(f"\n[新连接] 端口 {src_port}")
connection_packets.append(pkt)
# 发送最后一个连接的数据
if connection_packets:
self.log_with_timestamp(f"\n[连接 {current_connection}] 发送 {len(connection_packets)} 个包")
self._send_connection_packets(connection_packets)
self.log_with_timestamp("\n所有连接重放完成")
except Exception as e:
self.log_with_timestamp(f"重放所有连接时出错: {e}")
import traceback
traceback.print_exc()
def _send_connection_packets(self, packets):
"""发送单个连接的所有数据包"""
# 按序列号重组TCP流
packets.sort(key=lambda x: x['seq'])
tcp_segments = []
chunk_sizes = []
chunk_timestamps = []
expected_seq = packets[0]['seq']
for pkt in packets:
seq = pkt['seq']
payload = pkt['payload']
timestamp = pkt['timestamp']
if seq == expected_seq:
tcp_segments.append(payload)
chunk_sizes.append(len(payload))
chunk_timestamps.append(timestamp)
expected_seq += len(payload)
elif seq < expected_seq:
overlap = expected_seq - seq
if len(payload) > overlap:
tcp_segments.append(payload[overlap:])
chunk_sizes.append(len(payload[overlap:]))
chunk_timestamps.append(timestamp)
expected_seq = seq + len(payload)
else:
tcp_segments.append(payload)
chunk_sizes.append(len(payload))
chunk_timestamps.append(timestamp)
expected_seq = seq + len(payload)
tcp_stream = b''.join(tcp_segments)
if not tcp_stream:
self.log_with_timestamp(" 警告:没有数据可发送")
return
self.log_with_timestamp(f" 重组完成: {len(tcp_stream) / (1024*1024):.2f} MB, {len(chunk_sizes)} 个包")
# 建立新连接
if not self.establish_tcp_connection(None):
self.log_with_timestamp(" 无法建立连接")
return
# 启动响应读取线程
self.stop_reading.clear()
reader_thread = threading.Thread(target=self.response_reader)
reader_thread.daemon = True
reader_thread.start()
# 发送数据
try:
sent_bytes = 0
chunk_index = 0
replay_start_time = time.time()
first_packet_time = chunk_timestamps[0] if chunk_timestamps else 0
while sent_bytes < len(tcp_stream) and chunk_index < len(chunk_sizes):
# 计算等待时间
if chunk_index < len(chunk_timestamps):
target_time = chunk_timestamps[chunk_index] - first_packet_time
elapsed_time = time.time() - replay_start_time
wait_time = target_time - elapsed_time
if wait_time > 0:
if wait_time > 5.0:
wait_time = 5.0
time.sleep(wait_time)
current_chunk_size = chunk_sizes[chunk_index]
remaining = len(tcp_stream) - sent_bytes
current_chunk_size = min(current_chunk_size, remaining)
chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size]
chunk_sent = 0
while chunk_sent < len(chunk):
try:
self.socket.settimeout(10)
bytes_sent = self.socket.send(chunk[chunk_sent:])
if bytes_sent == 0:
break
chunk_sent += bytes_sent
sent_bytes += bytes_sent
self.total_bytes_sent += bytes_sent
self.last_activity_time = time.time()
except socket.timeout:
continue
except socket.error:
break
if chunk_sent == len(chunk):
chunk_index += 1
else:
break
self.log_with_timestamp(f" 发送完成: {sent_bytes / (1024*1024):.2f} MB")
time.sleep(1) # 短暂等待
finally:
self.stop_reading.set()
if self.socket:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
self.socket.close()
self.socket = None
def replay_packets(self, src_ip=None, src_port=None, protocol=None, delay=0):
"""重放数据包采用正确的TCP流重组方式"""
# 首先加载所有数据包
@@ -271,15 +465,17 @@ class PacketReplayer:
self.log_with_timestamp("没有找到可发送的数据包")
return
# 正确<EFBFBD><EFBFBD><EFBFBD>TCP流重组 - 按序列号重组!
# 正确TCP流重组 - 按序列号重组!
self.log_with_timestamp("重组TCP流...")
# 第一步:按序列号排序
self.data_packets.sort(key=lambda x: x['seq'])
self.log_with_timestamp(f"按序列号排序完成,共 {len(self.data_packets)} 个数据包")
# 第二步正确重组TCP流
# 第二步正确重组TCP流,同时保留每个包的大小和时间戳信息
tcp_segments = []
chunk_sizes = [] # 保存每个原始包的大小
chunk_timestamps = [] # 保存每个原始包的时间戳
expected_seq = self.data_packets[0]['seq']
self.log_with_timestamp(f"开始TCP重组初始序列号: {expected_seq}")
@@ -289,10 +485,13 @@ class PacketReplayer:
for packet_info in self.data_packets:
seq = packet_info['seq']
payload = packet_info['payload']
timestamp = packet_info['timestamp']
if seq == expected_seq:
# 序列号正确,添加到流中
tcp_segments.append(payload)
chunk_sizes.append(len(payload)) # 记录原始包大小
chunk_timestamps.append(timestamp) # 记录时间戳
expected_seq += len(payload)
processed_packets += 1
elif seq < expected_seq:
@@ -301,6 +500,8 @@ class PacketReplayer:
if len(payload) > overlap:
# 去除重叠部分,添加剩余数据
tcp_segments.append(payload[overlap:])
chunk_sizes.append(len(payload[overlap:])) # 记录实际添加的大小
chunk_timestamps.append(timestamp) # 记录时间戳
expected_seq = seq + len(payload)
processed_packets += 1
else:
@@ -312,6 +513,8 @@ class PacketReplayer:
self.log_with_timestamp(f"检测到数据包间隙: {gap} 字节,从序列号 {expected_seq}{seq}")
# 跳过间隙,继续处理
tcp_segments.append(payload)
chunk_sizes.append(len(payload)) # 记录原始包大小
chunk_timestamps.append(timestamp) # 记录时间戳
expected_seq = seq + len(payload)
processed_packets += 1
@@ -343,63 +546,97 @@ class PacketReplayer:
reader_thread.daemon = True
reader_thread.start()
self.log_with_timestamp(f"开始发送TCP流数据")
self.log_with_timestamp(f"开始发送TCP流数据使用动态chunk大小和原始时间间隔")
self.log_with_timestamp(f"原始数据包数量: {len(chunk_sizes)}")
if chunk_sizes:
avg_size = sum(chunk_sizes) / len(chunk_sizes)
min_size = min(chunk_sizes)
max_size = max(chunk_sizes)
self.log_with_timestamp(f"包大小统计 - 平均: {avg_size:.0f}, 最小: {min_size}, 最大: {max_size}")
if chunk_timestamps:
duration = chunk_timestamps[-1] - chunk_timestamps[0]
self.log_with_timestamp(f"抓包时长: {duration:.2f}")
try:
# 使用更小的块大小确保RTMP协议完整性
chunk_size = 1024 # 减小到1KB更安全
sent_bytes = 0
chunk_count = 0
last_progress_time = time.time()
self.start_time = time.time()
replay_start_time = time.time() # 重放开始时间
first_packet_time = chunk_timestamps[0] if chunk_timestamps else 0 # 第一个包的时间戳
chunk_index = 0
while sent_bytes < len(tcp_stream):
while sent_bytes < len(tcp_stream) and chunk_index < len(chunk_sizes):
current_time = time.time()
# 每5秒显示一次进度
if current_time - last_progress_time >= 5.0:
progress = (sent_bytes / len(tcp_stream)) * 100
self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB)")
elapsed = current_time - replay_start_time
self.log_with_timestamp(f"发送进度: {progress:.1f}% ({sent_bytes / (1024*1024):.2f}/{len(tcp_stream) / (1024*1024):.2f} MB) 已耗时: {elapsed:.1f}")
last_progress_time = current_time
# 计算当前块大小
# 计算应该等待的时间(按照原始时间间隔)
if chunk_index < len(chunk_timestamps):
target_time = chunk_timestamps[chunk_index] - first_packet_time
elapsed_time = time.time() - replay_start_time
wait_time = target_time - elapsed_time
if wait_time > 0:
# 限制最大等待时间,避免异常
if wait_time > 5.0:
self.log_with_timestamp(f"警告:等待时间过长 {wait_time:.3f}限制为5秒")
wait_time = 5.0
time.sleep(wait_time)
# 使用原始数据包的大小作为chunk_size
current_chunk_size = chunk_sizes[chunk_index]
remaining = len(tcp_stream) - sent_bytes
current_chunk_size = min(chunk_size, remaining)
current_chunk_size = min(current_chunk_size, remaining)
# 发送数据块
# 发送数据块 - 确保完全发送
chunk = tcp_stream[sent_bytes:sent_bytes + current_chunk_size]
chunk_sent = 0
try:
self.socket.settimeout(10) # 10秒超时
bytes_sent = self.socket.send(chunk)
# 循环直到当前chunk完全发送
while chunk_sent < len(chunk):
try:
self.socket.settimeout(10) # 10秒超时
bytes_sent = self.socket.send(chunk[chunk_sent:])
if bytes_sent == 0:
self.log_with_timestamp("连接已断开")
if bytes_sent == 0:
self.log_with_timestamp("连接已断开")
break
chunk_sent += bytes_sent
sent_bytes += bytes_sent
self.total_bytes_sent += bytes_sent
self.last_activity_time = time.time()
# 如果部分发送,记录日志
if chunk_sent < len(chunk):
self.log_with_timestamp(f"部分发送: {chunk_sent}/{len(chunk)} 字节,继续发送剩余部分...")
except socket.timeout:
self.log_with_timestamp(f"发送数据块超时,重试...")
continue
except socket.error as e:
self.log_with_timestamp(f"发送数据块失败: {e}")
break
sent_bytes += bytes_sent
# 检查是否完全发送
if chunk_sent == len(chunk):
chunk_count += 1
self.total_bytes_sent += bytes_sent
self.last_activity_time = time.time()
# 如果没有完全发送当前块,继续发送剩余部分
if bytes_sent < len(chunk):
self.log_with_timestamp(f"部分发送: {bytes_sent}/{len(chunk)} 字节")
continue
except socket.timeout:
self.log_with_timestamp(f"发送数据块超时,继续...")
continue
except socket.error as e:
self.log_with_timestamp(f"发送数据块失败: {e}")
chunk_index += 1
else:
# 未完全发送,停止
self.log_with_timestamp(f"无法完全发送数据块,已发送 {chunk_sent}/{len(chunk)} 字节")
break
# 更温和的流控制
if chunk_count % 50 == 0: # 每50个块休眠
time.sleep(0.005) # 5ms休眠
# 移除固定的流控制延迟让TCP自己控制
self.log_with_timestamp(f"数据发送完成,等待服务器处理...")
time.sleep(3) # 等待服务器处理完成
time.sleep(10) # 增加等待时间到10秒
self.log_with_timestamp(f"\n=== 发送完成 ===")
self.log_with_timestamp(f"成功发送了 {chunk_count} 个数据块")
@@ -506,6 +743,7 @@ def main():
parser.add_argument('--protocol', choices=['tcp', 'udp'], help='过滤协议类型')
parser.add_argument('--auto-select', action='store_true', help='自动选择数据量最大的连接')
parser.add_argument('--list-connections', action='store_true', help='列出所有连接并退出')
parser.add_argument('--all', action='store_true', help='推送所有连接的数据,不区分源端口')
args = parser.parse_args()
@@ -516,8 +754,12 @@ def main():
replayer.list_all_connections()
return
# 如果指定了--all推送所有连接自动重连
if args.all:
print("推送整个pcap文件的所有连接检测到新连接时自动重连")
replayer.replay_all_connections(args.src_ip, args.protocol, args.delay)
# 如果启用自动选择或没有指定源端口,选择最大的连接
if args.auto_select or args.src_port is None:
elif args.auto_select or args.src_port is None:
best_port = replayer.find_largest_connection(args.src_ip)
if best_port:
print(f"自动选择数据量最大的连接: 端口 {best_port}")