diff --git a/RELEASE_NOTES_5.0.x_CN.md b/RELEASE_NOTES_5.0.x_CN.md new file mode 100644 index 0000000..eecdf6f --- /dev/null +++ b/RELEASE_NOTES_5.0.x_CN.md @@ -0,0 +1,111 @@ +# Monibuca v5.0.x Release Notes + +## v5.0.2 (2025-06-05) + +### 🎉 新功能 (New Features) + +#### 核心功能 +- **降低延迟** - 禁用了TCP WebRTC的重放保护功能,降低了延迟 +- **配置系统增强** - 支持更多配置格式(支持配置项中插入`-`、`_`和大写字母),提升配置灵活性 +- **原始数据检查** - 新增原始数据无帧检查功能,提升数据处理稳定性 +- **MP4循环读取** - 支持MP4文件循环读取功能(通过配置 pull 配置下的 `loop` 配置) +- **S3插件** - 新增S3存储插件,支持云存储集成 +- **TCP读写缓冲配置** - 新增TCP连接读写缓冲区配置选项(针对高并发下的吞吐能力增强) +- **拉流测试模式** - 新增拉流测试模式选项(可以选择拉流时不发布),便于调试和测试 +- **SEI API格式扩展** - 扩展SEI API支持更多数据格式 +- **Hook扩展** - 新增更多Hook回调点,增强扩展性 +- **定时任务插件** - 新增crontab定时任务插件 +- **服务器抓包** - 新增服务器抓包功能(调用`tcpdump`),支持TCP和UDP协议,API 说明见 [tcpdump](https://api.monibuca.com/api-301117332) + +#### GB28181协议增强 +- **平台配置支持** - GB28181现在支持从config.yaml中添加平台和平台通道配置 +- **子码流播放** - 支持GB28181子码流播放功能 +- **SDP优化** - 优化invite SDP中的mediaip和sipip处理 +- **本地端口保存** - 修复GB28181本地端口保存到数据库的问题 + +#### MP4功能增强 +- **FLV格式下载** - 支持从MP4录制文件下载FLV格式 +- **下载功能修复** - 修复MP4下载功能的相关问题 +- **恢复功能修复** - 修复MP4恢复功能 + +### 🐛 问题修复 (Bug Fixes) + +#### 网络通信 +- **TCP读取阻塞** - 修复TCP读取阻塞问题(增加了读取超时设置) +- **RTSP内存泄漏** - 修复RTSP协议的内存泄漏问题 +- **RTSP音视频标识** - 修复RTSP无音频或视频标识的问题 + +#### GB28181协议 +- **任务管理** - 使用task.Manager解决注册处理器的问题 +- **计划长度** - 修复plan.length为168的问题 +- **注册频率** - 修复GB28181注册过快导致启动过多任务的问题 +- **联系信息** - 修复GB28181获取错误联系信息的问题 + +#### RTMP协议 +- **时间戳处理** - 修复RTMP时间戳开头跳跃问题 + +### 🛠️ 优化改进 (Improvements) + +#### Docker支持 +- **tcpdump工具** - Docker镜像中新增tcpdump网络诊断工具 + +#### Linux平台优化 +- **SIP请求优化** - Linux平台移除SIP请求中的viaheader + +### 👥 贡献者 (Contributors) +- langhuihui +- pggiroro +- banshan + +--- + +## v5.0.1 (2025-05-21) + +### 🎉 新功能 (New Features) + +#### WebRTC增强 +- **H265支持** - 新增WebRTC对H265编码的支持,提升视频质量和压缩效率 + +#### GB28181协议增强 +- **订阅功能扩展** - GB28181模块现在支持订阅报警、移动位置、目录信息 +- **通知请求** - 支持接收通知请求,增强与设备的交互能力 + +#### Docker优化 +- **FFmpeg集成** - Docker镜像中新增FFmpeg工具,支持更多音视频处理场景 +- **多架构支持** - 新增Docker多架构构建支持 + +### 🐛 问题修复 (Bug Fixes) + +#### Docker相关 +- **构建问题** - 修复Docker构建过程中的多个问题 +- **构建优化** - 优化Docker构建流程,提升构建效率 + +#### RTMP协议 +- **时间戳处理** - 修复RTMP第一个chunk类型3需要添加时间戳的问题 + +#### GB28181协议 +- **路径匹配** - 修复GB28181模块中播放流路径的正则表达式匹配问题 + +#### MP4处理 +- **stsz box** - 修复stsz box采样大小的问题 +- **G711音频** - 修复拉取MP4文件时读取G711音频的问题 +- **H265解析** - 修复H265 MP4文件解析问题 + +### 🛠️ 优化改进 (Improvements) + +#### 代码质量 +- **错误处理** - 新增maxcount错误处理机制 +- **文档更新** - 更新README文档和go.mod配置 + +#### 构建系统 +- **ARM架构** - 减少JavaScript代码,优化ARM架构Docker构建 +- **构建标签** - 移除Docker中不必要的构建标签 + +### 📦 其他更新 (Other Updates) +- **MCP相关** - 更新Model Context Protocol相关功能 +- **依赖更新** - 更新项目依赖和模块配置 + +### 👥 贡献者 (Contributors) +- langhuihui + +--- diff --git a/api.go b/api.go index af9c057..0680645 100644 --- a/api.go +++ b/api.go @@ -180,19 +180,17 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) { var recordings []*pb.RecordingDetail - s.Records.Call(func() error { - for record := range s.Records.Range { - if record.StreamPath == req.StreamPath { - recordings = append(recordings, &pb.RecordingDetail{ - FilePath: record.RecConf.FilePath, - Mode: record.Mode, - Fragment: durationpb.New(record.RecConf.Fragment), - Append: record.RecConf.Append, - PluginName: record.Plugin.Meta.Name, - }) - } + s.Records.SafeRange(func(record *RecordJob) bool { + if record.StreamPath == req.StreamPath { + recordings = append(recordings, &pb.RecordingDetail{ + FilePath: record.RecConf.FilePath, + Mode: record.Mode, + Fragment: durationpb.New(record.RecConf.Fragment), + Append: record.RecConf.Append, + PluginName: record.Plugin.Meta.Name, + }) } - return nil + return true }) if pub, ok := s.Streams.SafeGet(req.StreamPath); ok { res, err = s.getStreamInfo(pub) @@ -260,17 +258,15 @@ func (s *Server) RestartTask(ctx context.Context, req *pb.RequestWithId64) (resp } func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb.RecordingListResponse, err error) { - s.Records.Call(func() error { - resp = &pb.RecordingListResponse{} - for record := range s.Records.Range { - resp.Data = append(resp.Data, &pb.Recording{ - StreamPath: record.StreamPath, - StartTime: timestamppb.New(record.StartTime), - Type: reflect.TypeOf(record.recorder).String(), - Pointer: uint64(record.GetTaskPointer()), - }) - } - return nil + resp = &pb.RecordingListResponse{} + s.Records.SafeRange(func(record *RecordJob) bool { + resp.Data = append(resp.Data, &pb.Recording{ + StreamPath: record.StreamPath, + StartTime: timestamppb.New(record.StartTime), + Type: reflect.TypeOf(record.recorder).String(), + Pointer: uint64(record.GetTaskPointer()), + }) + return true }) return } @@ -490,7 +486,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.S func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) { s.Streams.Call(func() error { if subscriber, ok := s.Subscribers.Get(req.Id); ok { - if pub, ok := s.Streams.SafeGet(req.StreamPath); ok { + if pub, ok := s.Streams.Get(req.StreamPath); ok { subscriber.Publisher.RemoveSubscriber(subscriber) subscriber.StreamPath = req.StreamPath pub.AddSubscriber(subscriber) @@ -516,54 +512,39 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res } func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { - s.Streams.Call(func() error { - if s, ok := s.Streams.SafeGet(req.StreamPath); ok { - s.Pause() - } - return nil - }) + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { + s.Pause() + } return &pb.SuccessResponse{}, err } func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { - s.Streams.Call(func() error { - if s, ok := s.Streams.SafeGet(req.StreamPath); ok { - s.Resume() - } - return nil - }) + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { + s.Resume() + } return &pb.SuccessResponse{}, err } func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) { - s.Streams.Call(func() error { - if s, ok := s.Streams.SafeGet(req.StreamPath); ok { - s.Speed = float64(req.Speed) - s.Scale = float64(req.Speed) - s.Info("set stream speed", "speed", req.Speed) - } - return nil - }) + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { + s.Speed = float64(req.Speed) + s.Scale = float64(req.Speed) + s.Info("set stream speed", "speed", req.Speed) + } return &pb.SuccessResponse{}, err } func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) { - s.Streams.Call(func() error { - if s, ok := s.Streams.SafeGet(req.StreamPath); ok { - s.Seek(time.Unix(int64(req.TimeStamp), 0)) - } - return nil - }) + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { + s.Seek(time.Unix(int64(req.TimeStamp), 0)) + } return &pb.SuccessResponse{}, err } func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { - s.Streams.Call(func() error { - if s, ok := s.Streams.SafeGet(req.StreamPath); ok { - s.Stop(task.ErrStopByUser) - } - return nil - }) + if s, ok := s.Streams.SafeGet(req.StreamPath); ok { + s.Stop(task.ErrStopByUser) + } return &pb.SuccessResponse{}, err } diff --git a/pkg/util/buddy.go b/pkg/util/buddy.go index e5ea517..5d9beb9 100644 --- a/pkg/util/buddy.go +++ b/pkg/util/buddy.go @@ -2,33 +2,55 @@ package util import ( "errors" + "sync" + "unsafe" ) type Buddy struct { - size int - longests []int + size int + longests [BuddySize>>(MinPowerOf2-1) - 1]int + memoryPool [BuddySize]byte + poolStart int64 + lock sync.Mutex // 保护 longests 数组的并发访问 } var ( InValidParameterErr = errors.New("buddy: invalid parameter") NotFoundErr = errors.New("buddy: can't find block") + buddyPool = sync.Pool{ + New: func() interface{} { + return NewBuddy() + }, + } ) +// GetBuddy 从池中获取一个 Buddy 实例 +func GetBuddy() *Buddy { + buddy := buddyPool.Get().(*Buddy) + return buddy +} + +// PutBuddy 将 Buddy 实例放回池中 +func PutBuddy(b *Buddy) { + buddyPool.Put(b) +} + // NewBuddy creates a buddy instance. // If the parameter isn't valid, return the nil and error as well -func NewBuddy(size int) *Buddy { - if !isPowerOf2(size) { - size = fixSize(size) +func NewBuddy() *Buddy { + size := BuddySize >> MinPowerOf2 + ret := &Buddy{ + size: size, } - nodeCount := 2*size - 1 - longests := make([]int, nodeCount) - for nodeSize, i := 2*size, 0; i < nodeCount; i++ { + for nodeSize, i := 2*size, 0; i < len(ret.longests); i++ { if isPowerOf2(i + 1) { nodeSize /= 2 } - longests[i] = nodeSize + ret.longests[i] = nodeSize } - return &Buddy{size, longests} + ret.poolStart = int64(uintptr(unsafe.Pointer(&ret.memoryPool[0]))) + + return ret } // Alloc find a unused block according to the size @@ -42,6 +64,8 @@ func (b *Buddy) Alloc(size int) (offset int, err error) { if !isPowerOf2(size) { size = fixSize(size) } + b.lock.Lock() + defer b.lock.Unlock() if size > b.longests[0] { err = NotFoundErr return @@ -70,6 +94,8 @@ func (b *Buddy) Free(offset int) error { if offset < 0 || offset >= b.size { return InValidParameterErr } + b.lock.Lock() + defer b.lock.Unlock() nodeSize := 1 index := offset + b.size - 1 for ; b.longests[index] != 0; index = parent(index) { diff --git a/pkg/util/rm_enable.go b/pkg/util/rm_enable.go index ac5cb89..4cd027e 100644 --- a/pkg/util/rm_enable.go +++ b/pkg/util/rm_enable.go @@ -3,11 +3,9 @@ package util import ( - "container/list" "fmt" "io" "slices" - "sync" "unsafe" ) @@ -58,53 +56,59 @@ func (r *RecyclableMemory) Recycle() { } } -var ( - memoryPool [BuddySize]byte - buddy = NewBuddy(BuddySize >> MinPowerOf2) - lock sync.Mutex - poolStart = int64(uintptr(unsafe.Pointer(&memoryPool[0]))) - blockPool = list.New() - //EnableCheckSize bool = false -) - type MemoryAllocator struct { allocator *Allocator start int64 memory []byte Size int + buddy *Buddy +} + +// createMemoryAllocator 创建并初始化 MemoryAllocator +func createMemoryAllocator(size int, buddy *Buddy, offset int) *MemoryAllocator { + ret := &MemoryAllocator{ + allocator: NewAllocator(size), + buddy: buddy, + Size: size, + memory: buddy.memoryPool[offset : offset+size], + start: buddy.poolStart + int64(offset), + } + ret.allocator.Init(size) + return ret } func GetMemoryAllocator(size int) (ret *MemoryAllocator) { - lock.Lock() - offset, err := buddy.Alloc(size >> MinPowerOf2) - if blockPool.Len() > 0 { - ret = blockPool.Remove(blockPool.Front()).(*MemoryAllocator) - } else { - ret = &MemoryAllocator{ - allocator: NewAllocator(size), + if size < BuddySize { + requiredSize := size >> MinPowerOf2 + // 循环尝试从池中获取可用的 buddy + for { + buddy := GetBuddy() + offset, err := buddy.Alloc(requiredSize) + PutBuddy(buddy) + if err == nil { + // 分配成功,使用这个 buddy + return createMemoryAllocator(size, buddy, offset<> MinPowerOf2)) + if ma.buddy != nil { + _ = ma.buddy.Free(int((ma.buddy.poolStart - ma.start) >> MinPowerOf2)) + ma.buddy = nil + } ma.memory = nil - lock.Unlock() } func (ma *MemoryAllocator) Find(size int) (memory []byte) { diff --git a/plugin/debug/chart.go b/plugin/debug/chart.go index eab2258..953d6f8 100644 --- a/plugin/debug/chart.go +++ b/plugin/debug/chart.go @@ -15,6 +15,7 @@ import ( "github.com/gorilla/websocket" "github.com/shirou/gopsutil/v4/cpu" "github.com/shirou/gopsutil/v4/process" + "m7s.live/v5/pkg/task" ) //go:embed static/* @@ -40,8 +41,17 @@ type consumer struct { } type server struct { + task.TickTask consumers []consumer consumersMutex sync.RWMutex + data DataStorage + lastPause uint32 + dataMutex sync.RWMutex + lastConsumerID uint + upgrader websocket.Upgrader + prevSysTime float64 + prevUserTime float64 + myProcess *process.Process } type SimplePair struct { @@ -75,99 +85,91 @@ const ( maxCount int = 86400 ) -var ( - data DataStorage - lastPause uint32 - mutex sync.RWMutex - lastConsumerID uint - s server - upgrader = websocket.Upgrader{ +func (s *server) Start() error { + var err error + s.myProcess, err = process.NewProcess(int32(os.Getpid())) + if err != nil { + log.Printf("Failed to get process: %v", err) + } + // 初始化 WebSocket upgrader + s.upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } - prevSysTime float64 - prevUserTime float64 - myProcess *process.Process -) - -func init() { - - myProcess, _ = process.NewProcess(int32(os.Getpid())) - // preallocate arrays in data, helps save on reallocations caused by append() // when maxCount is large - data.BytesAllocated = make([]SimplePair, 0, maxCount) - data.GcPauses = make([]SimplePair, 0, maxCount) - data.CPUUsage = make([]CPUPair, 0, maxCount) - data.Pprof = make([]PprofPair, 0, maxCount) - - go s.gatherData() + s.data.BytesAllocated = make([]SimplePair, 0, maxCount) + s.data.GcPauses = make([]SimplePair, 0, maxCount) + s.data.CPUUsage = make([]CPUPair, 0, maxCount) + s.data.Pprof = make([]PprofPair, 0, maxCount) + return s.TickTask.Start() } -func (s *server) gatherData() { - timer := time.Tick(time.Second) +func (s *server) GetTickInterval() time.Duration { + return time.Second +} - for now := range timer { - nowUnix := now.Unix() +func (s *server) Tick(any) { + now := time.Now() + nowUnix := now.Unix() - var ms runtime.MemStats - runtime.ReadMemStats(&ms) + var ms runtime.MemStats + runtime.ReadMemStats(&ms) - u := update{ - Ts: nowUnix * 1000, - Block: pprof.Lookup("block").Count(), - Goroutine: pprof.Lookup("goroutine").Count(), - Heap: pprof.Lookup("heap").Count(), - Mutex: pprof.Lookup("mutex").Count(), - Threadcreate: pprof.Lookup("threadcreate").Count(), - } - data.Pprof = append(data.Pprof, PprofPair{ - uint64(nowUnix) * 1000, - u.Block, - u.Goroutine, - u.Heap, - u.Mutex, - u.Threadcreate, - }) - - cpuTimes, err := myProcess.Times() - if err != nil { - cpuTimes = &cpu.TimesStat{} - } - - if prevUserTime != 0 { - u.CPUUser = cpuTimes.User - prevUserTime - u.CPUSys = cpuTimes.System - prevSysTime - data.CPUUsage = append(data.CPUUsage, CPUPair{uint64(nowUnix) * 1000, u.CPUUser, u.CPUSys}) - } - - prevUserTime = cpuTimes.User - prevSysTime = cpuTimes.System - - mutex.Lock() - - bytesAllocated := ms.Alloc - u.BytesAllocated = bytesAllocated - data.BytesAllocated = append(data.BytesAllocated, SimplePair{uint64(nowUnix) * 1000, bytesAllocated}) - if lastPause == 0 || lastPause != ms.NumGC { - gcPause := ms.PauseNs[(ms.NumGC+255)%256] - u.GcPause = gcPause - data.GcPauses = append(data.GcPauses, SimplePair{uint64(nowUnix) * 1000, gcPause}) - lastPause = ms.NumGC - } - - if len(data.BytesAllocated) > maxCount { - data.BytesAllocated = data.BytesAllocated[len(data.BytesAllocated)-maxCount:] - } - - if len(data.GcPauses) > maxCount { - data.GcPauses = data.GcPauses[len(data.GcPauses)-maxCount:] - } - - mutex.Unlock() - - s.sendToConsumers(u) + u := update{ + Ts: nowUnix * 1000, + Block: pprof.Lookup("block").Count(), + Goroutine: pprof.Lookup("goroutine").Count(), + Heap: pprof.Lookup("heap").Count(), + Mutex: pprof.Lookup("mutex").Count(), + Threadcreate: pprof.Lookup("threadcreate").Count(), } + s.data.Pprof = append(s.data.Pprof, PprofPair{ + uint64(nowUnix) * 1000, + u.Block, + u.Goroutine, + u.Heap, + u.Mutex, + u.Threadcreate, + }) + + cpuTimes, err := s.myProcess.Times() + if err != nil { + cpuTimes = &cpu.TimesStat{} + } + + if s.prevUserTime != 0 { + u.CPUUser = cpuTimes.User - s.prevUserTime + u.CPUSys = cpuTimes.System - s.prevSysTime + s.data.CPUUsage = append(s.data.CPUUsage, CPUPair{uint64(nowUnix) * 1000, u.CPUUser, u.CPUSys}) + } + + s.prevUserTime = cpuTimes.User + s.prevSysTime = cpuTimes.System + + s.dataMutex.Lock() + + bytesAllocated := ms.Alloc + u.BytesAllocated = bytesAllocated + s.data.BytesAllocated = append(s.data.BytesAllocated, SimplePair{uint64(nowUnix) * 1000, bytesAllocated}) + if s.lastPause == 0 || s.lastPause != ms.NumGC { + gcPause := ms.PauseNs[(ms.NumGC+255)%256] + u.GcPause = gcPause + s.data.GcPauses = append(s.data.GcPauses, SimplePair{uint64(nowUnix) * 1000, gcPause}) + s.lastPause = ms.NumGC + } + + if len(s.data.BytesAllocated) > maxCount { + s.data.BytesAllocated = s.data.BytesAllocated[len(s.data.BytesAllocated)-maxCount:] + } + + if len(s.data.GcPauses) > maxCount { + s.data.GcPauses = s.data.GcPauses[len(s.data.GcPauses)-maxCount:] + } + + s.dataMutex.Unlock() + + s.sendToConsumers(u) } func (s *server) sendToConsumers(u update) { @@ -203,10 +205,10 @@ func (s *server) addConsumer() consumer { s.consumersMutex.Lock() defer s.consumersMutex.Unlock() - lastConsumerID++ + s.lastConsumerID++ c := consumer{ - id: lastConsumerID, + id: s.lastConsumerID, c: make(chan update), } @@ -221,7 +223,7 @@ func (s *server) dataFeedHandler(w http.ResponseWriter, r *http.Request) { lastPong time.Time ) - conn, err := upgrader.Upgrade(w, r, nil) + conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return @@ -268,9 +270,9 @@ func (s *server) dataFeedHandler(w http.ResponseWriter, r *http.Request) { } } -func dataHandler(w http.ResponseWriter, r *http.Request) { - mutex.RLock() - defer mutex.RUnlock() +func (s *server) dataHandler(w http.ResponseWriter, r *http.Request) { + s.dataMutex.RLock() + defer s.dataMutex.RUnlock() if e := r.ParseForm(); e != nil { log.Print("error parsing form") @@ -284,7 +286,7 @@ func dataHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") encoder := json.NewEncoder(w) - encoder.Encode(data) + encoder.Encode(s.data) fmt.Fprint(w, ")") } diff --git a/plugin/debug/index.go b/plugin/debug/index.go index 3d2b5ee..d03cb58 100644 --- a/plugin/debug/index.go +++ b/plugin/debug/index.go @@ -34,13 +34,13 @@ type DebugPlugin struct { m7s.Plugin ProfileDuration time.Duration `default:"10s" desc:"profile持续时间"` Profile string `desc:"采集profile存储文件"` - ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"` Grfout string `default:"grf.out" desc:"grf输出文件"` - + EnableChart bool `default:"true" desc:"是否启用图表功能"` // 添加缓存字段 cpuProfileData *profile.Profile // 缓存 CPU Profile 数据 cpuProfileOnce sync.Once // 确保只采集一次 cpuProfileLock sync.Mutex // 保护缓存数据 + chartServer server } type WriteToFile struct { @@ -72,6 +72,9 @@ func (p *DebugPlugin) OnInit() error { p.Info("cpu profile done") }() } + if p.EnableChart { + p.AddTask(&p.chartServer) + } return nil } @@ -100,11 +103,11 @@ func (p *DebugPlugin) Charts_(w http.ResponseWriter, r *http.Request) { } func (p *DebugPlugin) Charts_data(w http.ResponseWriter, r *http.Request) { - dataHandler(w, r) + p.chartServer.dataHandler(w, r) } func (p *DebugPlugin) Charts_datafeed(w http.ResponseWriter, r *http.Request) { - s.dataFeedHandler(w, r) + p.chartServer.dataFeedHandler(w, r) } func (p *DebugPlugin) Grf(w http.ResponseWriter, r *http.Request) { diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index d20a444..74a4e7b 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -2468,12 +2468,9 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus resp.Message = fmt.Sprintf("发送暂停请求失败: %v", err) return resp, nil } - gb.Server.Streams.Call(func() error { - if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { - s.Pause() - } - return nil - }) + if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok { + s.Pause() + } gb.Info("暂停回放", "streampath", req.StreamPath) @@ -2522,12 +2519,9 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes resp.Message = fmt.Sprintf("发送恢复请求失败: %v", err) return resp, nil } - gb.Server.Streams.Call(func() error { - if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { - s.Resume() - } - return nil - }) + if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok { + s.Resume() + } gb.Info("恢复回放", "streampath", req.StreamPath) @@ -2595,14 +2589,11 @@ func (gb *GB28181Plugin) PlaybackSpeed(ctx context.Context, req *pb.PlaybackSpee // 发送请求 _, err := dialog.session.TransactionRequest(ctx, request) - gb.Server.Streams.Call(func() error { - if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { - s.Speed = float64(req.Speed) - s.Scale = float64(req.Speed) - s.Info("set stream speed", "speed", req.Speed) - } - return nil - }) + if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok { + s.Speed = float64(req.Speed) + s.Scale = float64(req.Speed) + s.Info("set stream speed", "speed", req.Speed) + } if err != nil { resp.Code = 500 resp.Message = fmt.Sprintf("发送倍速请求失败: %v", err) diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index 65372d9..e5faa22 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -458,11 +458,8 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) filePath = req.FilePath } res = &mp4pb.ResponseStartRecord{} - p.Server.Records.Call(func() error { - _, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool { - return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath - }) - return nil + _, recordExists = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool { + return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath }) if recordExists { err = pkg.ErrRecordExists @@ -485,19 +482,16 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) func (p *MP4Plugin) StopRecord(ctx context.Context, req *mp4pb.ReqStopRecord) (res *mp4pb.ResponseStopRecord, err error) { res = &mp4pb.ResponseStopRecord{} var recordJob *m7s.RecordJob - p.Server.Records.Call(func() error { - recordJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool { - return job.StreamPath == req.StreamPath - }) - if recordJob != nil { - t := recordJob.GetTask() - if t != nil { - res.Data = uint64(uintptr(unsafe.Pointer(t))) - t.Stop(task.ErrStopByUser) - } - } - return nil + recordJob, _ = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool { + return job.StreamPath == req.StreamPath }) + if recordJob != nil { + t := recordJob.GetTask() + if t != nil { + res.Data = uint64(uintptr(unsafe.Pointer(t))) + t.Stop(task.ErrStopByUser) + } + } return } @@ -519,11 +513,8 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) ( } //recorder := p.Meta.Recorder(config.Record{}) var tmpJob *m7s.RecordJob - p.Server.Records.Call(func() error { - tmpJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool { - return job.StreamPath == req.StreamPath - }) - return nil + tmpJob, _ = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool { + return job.StreamPath == req.StreamPath }) if tmpJob == nil { //为空表示没有正在进行的录制,也就是没有自动录像,则进行正常的事件录像 if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok { diff --git a/plugin/rtmp/pkg/handshake.go b/plugin/rtmp/pkg/handshake.go index f9c85b1..858b738 100644 --- a/plugin/rtmp/pkg/handshake.go +++ b/plugin/rtmp/pkg/handshake.go @@ -79,10 +79,10 @@ func (nc *NetConnection) Handshake(checkC2 bool) (err error) { if len(C1) != C1S1_SIZE { return errors.New("C1 Error") } - var ts int - util.GetBE(C1[4:8], &ts) + var zero int + util.GetBE(C1[4:8], &zero) - if ts == 0 { + if zero == 0 { return nc.simple_handshake(C1, checkC2) } @@ -92,12 +92,26 @@ func (nc *NetConnection) Handshake(checkC2 bool) (err error) { func (nc *NetConnection) ClientHandshake() (err error) { C0C1 := nc.mediaDataPool.NextN(C1S1_SIZE + 1) defer nc.mediaDataPool.Recycle() + + // 构造 C0 C0C1[0] = RTMP_HANDSHAKE_VERSION + + // 构造 C1 使用简单握手格式 + C1 := C0C1[1:] + // Time (4 bytes): 当前时间戳 + util.PutBE(C1[0:4], time.Now().Unix()&0xFFFFFFFF) + // Zero (4 bytes): 必须为 0,确保使用简单握手 + util.PutBE(C1[4:8], 0) + // Random data (1528 bytes): 填充随机数据 + for i := 8; i < C1S1_SIZE; i++ { + C1[i] = byte(rand.Int() % 256) + } + if _, err = nc.Write(C0C1); err == nil { // read S0 S1 if _, err = io.ReadFull(nc.Conn, C0C1); err == nil { if C0C1[0] != RTMP_HANDSHAKE_VERSION { - err = errors.New("S1 C1 Error") + err = errors.New("S0 Error") // C2 } else if _, err = nc.Write(C0C1[1:]); err == nil { _, err = io.ReadFull(nc.Conn, C0C1[1:]) // S2 @@ -222,13 +236,7 @@ func clientScheme(C1 []byte, schem int) (scheme int, challenge []byte, digest [] return 0, nil, nil, false, err } - // ok - if bytes.Compare(digest, tmp_Hash) == 0 { - ok = true - } else { - ok = false - } - + ok = bytes.Equal(digest, tmp_Hash) // challenge scheme challenge = C1[key_offset : key_offset+C1S1_KEY_DATA_SIZE] scheme = schem diff --git a/prometheus.go b/prometheus.go index 78a9ba0..f57b04f 100644 --- a/prometheus.go +++ b/prometheus.go @@ -76,13 +76,10 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(s.prometheusDesc.Net.ReceiveSpeed, prometheus.GaugeValue, float64(net.ReceiveSpeed), net.Name) } } - s.Call(func() error { - for stream := range s.Streams.Range { - ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "video") - ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "video") - ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio") - ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio") - } - return nil - }) + for stream := range s.Streams.SafeRange { + ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "video") + ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "video") + ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio") + ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio") + } }