fix: use safeGet insteadof Call and get

feat: multi buddy support
This commit is contained in:
langhuihui
2025-06-05 20:33:59 +08:00
parent 8ff14931fe
commit 80e19726d4
10 changed files with 368 additions and 254 deletions

111
RELEASE_NOTES_5.0.x_CN.md Normal file
View File

@@ -0,0 +1,111 @@
# Monibuca v5.0.x Release Notes
## v5.0.2 (2025-06-05)
### 🎉 新功能 (New Features)
#### 核心功能
- **降低延迟** - 禁用了TCP WebRTC的重放保护功能降低了延迟
- **配置系统增强** - 支持更多配置格式(支持配置项中插入`-``_`和大写字母),提升配置灵活性
- **原始数据检查** - 新增原始数据无帧检查功能,提升数据处理稳定性
- **MP4循环读取** - 支持MP4文件循环读取功能通过配置 pull 配置下的 `loop` 配置)
- **S3插件** - 新增S3存储插件支持云存储集成
- **TCP读写缓冲配置** - 新增TCP连接读写缓冲区配置选项针对高并发下的吞吐能力增强
- **拉流测试模式** - 新增拉流测试模式选项(可以选择拉流时不发布),便于调试和测试
- **SEI API格式扩展** - 扩展SEI API支持更多数据格式
- **Hook扩展** - 新增更多Hook回调点增强扩展性
- **定时任务插件** - 新增crontab定时任务插件
- **服务器抓包** - 新增服务器抓包功能(调用`tcpdump`支持TCP和UDP协议,API 说明见 [tcpdump](https://api.monibuca.com/api-301117332)
#### GB28181协议增强
- **平台配置支持** - GB28181现在支持从config.yaml中添加平台和平台通道配置
- **子码流播放** - 支持GB28181子码流播放功能
- **SDP优化** - 优化invite SDP中的mediaip和sipip处理
- **本地端口保存** - 修复GB28181本地端口保存到数据库的问题
#### MP4功能增强
- **FLV格式下载** - 支持从MP4录制文件下载FLV格式
- **下载功能修复** - 修复MP4下载功能的相关问题
- **恢复功能修复** - 修复MP4恢复功能
### 🐛 问题修复 (Bug Fixes)
#### 网络通信
- **TCP读取阻塞** - 修复TCP读取阻塞问题增加了读取超时设置
- **RTSP内存泄漏** - 修复RTSP协议的内存泄漏问题
- **RTSP音视频标识** - 修复RTSP无音频或视频标识的问题
#### GB28181协议
- **任务管理** - 使用task.Manager解决注册处理器的问题
- **计划长度** - 修复plan.length为168的问题
- **注册频率** - 修复GB28181注册过快导致启动过多任务的问题
- **联系信息** - 修复GB28181获取错误联系信息的问题
#### RTMP协议
- **时间戳处理** - 修复RTMP时间戳开头跳跃问题
### 🛠️ 优化改进 (Improvements)
#### Docker支持
- **tcpdump工具** - Docker镜像中新增tcpdump网络诊断工具
#### Linux平台优化
- **SIP请求优化** - Linux平台移除SIP请求中的viaheader
### 👥 贡献者 (Contributors)
- langhuihui
- pggiroro
- banshan
---
## v5.0.1 (2025-05-21)
### 🎉 新功能 (New Features)
#### WebRTC增强
- **H265支持** - 新增WebRTC对H265编码的支持提升视频质量和压缩效率
#### GB28181协议增强
- **订阅功能扩展** - GB28181模块现在支持订阅报警、移动位置、目录信息
- **通知请求** - 支持接收通知请求,增强与设备的交互能力
#### Docker优化
- **FFmpeg集成** - Docker镜像中新增FFmpeg工具支持更多音视频处理场景
- **多架构支持** - 新增Docker多架构构建支持
### 🐛 问题修复 (Bug Fixes)
#### Docker相关
- **构建问题** - 修复Docker构建过程中的多个问题
- **构建优化** - 优化Docker构建流程提升构建效率
#### RTMP协议
- **时间戳处理** - 修复RTMP第一个chunk类型3需要添加时间戳的问题
#### GB28181协议
- **路径匹配** - 修复GB28181模块中播放流路径的正则表达式匹配问题
#### MP4处理
- **stsz box** - 修复stsz box采样大小的问题
- **G711音频** - 修复拉取MP4文件时读取G711音频的问题
- **H265解析** - 修复H265 MP4文件解析问题
### 🛠️ 优化改进 (Improvements)
#### 代码质量
- **错误处理** - 新增maxcount错误处理机制
- **文档更新** - 更新README文档和go.mod配置
#### 构建系统
- **ARM架构** - 减少JavaScript代码优化ARM架构Docker构建
- **构建标签** - 移除Docker中不必要的构建标签
### 📦 其他更新 (Other Updates)
- **MCP相关** - 更新Model Context Protocol相关功能
- **依赖更新** - 更新项目依赖和模块配置
### 👥 贡献者 (Contributors)
- langhuihui
---

93
api.go
View File

@@ -180,19 +180,17 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err
func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) {
var recordings []*pb.RecordingDetail
s.Records.Call(func() error {
for record := range s.Records.Range {
if record.StreamPath == req.StreamPath {
recordings = append(recordings, &pb.RecordingDetail{
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
})
}
s.Records.SafeRange(func(record *RecordJob) bool {
if record.StreamPath == req.StreamPath {
recordings = append(recordings, &pb.RecordingDetail{
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
})
}
return nil
return true
})
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok {
res, err = s.getStreamInfo(pub)
@@ -260,17 +258,15 @@ func (s *Server) RestartTask(ctx context.Context, req *pb.RequestWithId64) (resp
}
func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb.RecordingListResponse, err error) {
s.Records.Call(func() error {
resp = &pb.RecordingListResponse{}
for record := range s.Records.Range {
resp.Data = append(resp.Data, &pb.Recording{
StreamPath: record.StreamPath,
StartTime: timestamppb.New(record.StartTime),
Type: reflect.TypeOf(record.recorder).String(),
Pointer: uint64(record.GetTaskPointer()),
})
}
return nil
resp = &pb.RecordingListResponse{}
s.Records.SafeRange(func(record *RecordJob) bool {
resp.Data = append(resp.Data, &pb.Recording{
StreamPath: record.StreamPath,
StartTime: timestamppb.New(record.StartTime),
Type: reflect.TypeOf(record.recorder).String(),
Pointer: uint64(record.GetTaskPointer()),
})
return true
})
return
}
@@ -490,7 +486,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.S
func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if subscriber, ok := s.Subscribers.Get(req.Id); ok {
if pub, ok := s.Streams.SafeGet(req.StreamPath); ok {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
subscriber.Publisher.RemoveSubscriber(subscriber)
subscriber.StreamPath = req.StreamPath
pub.AddSubscriber(subscriber)
@@ -516,54 +512,39 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res
}
func (s *Server) PauseStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Pause()
}
return nil
})
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Pause()
}
return &pb.SuccessResponse{}, err
}
func (s *Server) ResumeStream(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Resume()
}
return nil
})
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Resume()
}
return &pb.SuccessResponse{}, err
}
func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
return nil
})
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
return &pb.SuccessResponse{}, err
}
func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Seek(time.Unix(int64(req.TimeStamp), 0))
}
return nil
})
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Seek(time.Unix(int64(req.TimeStamp), 0))
}
return &pb.SuccessResponse{}, err
}
func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error {
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Stop(task.ErrStopByUser)
}
return nil
})
if s, ok := s.Streams.SafeGet(req.StreamPath); ok {
s.Stop(task.ErrStopByUser)
}
return &pb.SuccessResponse{}, err
}

View File

@@ -2,33 +2,55 @@ package util
import (
"errors"
"sync"
"unsafe"
)
type Buddy struct {
size int
longests []int
size int
longests [BuddySize>>(MinPowerOf2-1) - 1]int
memoryPool [BuddySize]byte
poolStart int64
lock sync.Mutex // 保护 longests 数组的并发访问
}
var (
InValidParameterErr = errors.New("buddy: invalid parameter")
NotFoundErr = errors.New("buddy: can't find block")
buddyPool = sync.Pool{
New: func() interface{} {
return NewBuddy()
},
}
)
// GetBuddy 从池中获取一个 Buddy 实例
func GetBuddy() *Buddy {
buddy := buddyPool.Get().(*Buddy)
return buddy
}
// PutBuddy 将 Buddy 实例放回池中
func PutBuddy(b *Buddy) {
buddyPool.Put(b)
}
// NewBuddy creates a buddy instance.
// If the parameter isn't valid, return the nil and error as well
func NewBuddy(size int) *Buddy {
if !isPowerOf2(size) {
size = fixSize(size)
func NewBuddy() *Buddy {
size := BuddySize >> MinPowerOf2
ret := &Buddy{
size: size,
}
nodeCount := 2*size - 1
longests := make([]int, nodeCount)
for nodeSize, i := 2*size, 0; i < nodeCount; i++ {
for nodeSize, i := 2*size, 0; i < len(ret.longests); i++ {
if isPowerOf2(i + 1) {
nodeSize /= 2
}
longests[i] = nodeSize
ret.longests[i] = nodeSize
}
return &Buddy{size, longests}
ret.poolStart = int64(uintptr(unsafe.Pointer(&ret.memoryPool[0])))
return ret
}
// Alloc find a unused block according to the size
@@ -42,6 +64,8 @@ func (b *Buddy) Alloc(size int) (offset int, err error) {
if !isPowerOf2(size) {
size = fixSize(size)
}
b.lock.Lock()
defer b.lock.Unlock()
if size > b.longests[0] {
err = NotFoundErr
return
@@ -70,6 +94,8 @@ func (b *Buddy) Free(offset int) error {
if offset < 0 || offset >= b.size {
return InValidParameterErr
}
b.lock.Lock()
defer b.lock.Unlock()
nodeSize := 1
index := offset + b.size - 1
for ; b.longests[index] != 0; index = parent(index) {

View File

@@ -3,11 +3,9 @@
package util
import (
"container/list"
"fmt"
"io"
"slices"
"sync"
"unsafe"
)
@@ -58,53 +56,59 @@ func (r *RecyclableMemory) Recycle() {
}
}
var (
memoryPool [BuddySize]byte
buddy = NewBuddy(BuddySize >> MinPowerOf2)
lock sync.Mutex
poolStart = int64(uintptr(unsafe.Pointer(&memoryPool[0])))
blockPool = list.New()
//EnableCheckSize bool = false
)
type MemoryAllocator struct {
allocator *Allocator
start int64
memory []byte
Size int
buddy *Buddy
}
// createMemoryAllocator 创建并初始化 MemoryAllocator
func createMemoryAllocator(size int, buddy *Buddy, offset int) *MemoryAllocator {
ret := &MemoryAllocator{
allocator: NewAllocator(size),
buddy: buddy,
Size: size,
memory: buddy.memoryPool[offset : offset+size],
start: buddy.poolStart + int64(offset),
}
ret.allocator.Init(size)
return ret
}
func GetMemoryAllocator(size int) (ret *MemoryAllocator) {
lock.Lock()
offset, err := buddy.Alloc(size >> MinPowerOf2)
if blockPool.Len() > 0 {
ret = blockPool.Remove(blockPool.Front()).(*MemoryAllocator)
} else {
ret = &MemoryAllocator{
allocator: NewAllocator(size),
if size < BuddySize {
requiredSize := size >> MinPowerOf2
// 循环尝试从池中获取可用的 buddy
for {
buddy := GetBuddy()
offset, err := buddy.Alloc(requiredSize)
PutBuddy(buddy)
if err == nil {
// 分配成功,使用这个 buddy
return createMemoryAllocator(size, buddy, offset<<MinPowerOf2)
}
}
}
lock.Unlock()
ret.Size = size
ret.allocator.Init(size)
if err != nil {
ret.memory = make([]byte, size)
ret.start = int64(uintptr(unsafe.Pointer(&ret.memory[0])))
return
// 池中的 buddy 都无法分配或大小不够,使用系统内存
memory := make([]byte, size)
start := int64(uintptr(unsafe.Pointer(&memory[0])))
return &MemoryAllocator{
allocator: NewAllocator(size),
Size: size,
memory: memory,
start: start,
}
offset = offset << MinPowerOf2
ret.memory = memoryPool[offset : offset+size]
ret.start = poolStart + int64(offset)
return
}
func (ma *MemoryAllocator) Recycle() {
ma.allocator.Recycle()
lock.Lock()
blockPool.PushBack(ma)
_ = buddy.Free(int((poolStart - ma.start) >> MinPowerOf2))
if ma.buddy != nil {
_ = ma.buddy.Free(int((ma.buddy.poolStart - ma.start) >> MinPowerOf2))
ma.buddy = nil
}
ma.memory = nil
lock.Unlock()
}
func (ma *MemoryAllocator) Find(size int) (memory []byte) {

View File

@@ -15,6 +15,7 @@ import (
"github.com/gorilla/websocket"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/process"
"m7s.live/v5/pkg/task"
)
//go:embed static/*
@@ -40,8 +41,17 @@ type consumer struct {
}
type server struct {
task.TickTask
consumers []consumer
consumersMutex sync.RWMutex
data DataStorage
lastPause uint32
dataMutex sync.RWMutex
lastConsumerID uint
upgrader websocket.Upgrader
prevSysTime float64
prevUserTime float64
myProcess *process.Process
}
type SimplePair struct {
@@ -75,99 +85,91 @@ const (
maxCount int = 86400
)
var (
data DataStorage
lastPause uint32
mutex sync.RWMutex
lastConsumerID uint
s server
upgrader = websocket.Upgrader{
func (s *server) Start() error {
var err error
s.myProcess, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
log.Printf("Failed to get process: %v", err)
}
// 初始化 WebSocket upgrader
s.upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
prevSysTime float64
prevUserTime float64
myProcess *process.Process
)
func init() {
myProcess, _ = process.NewProcess(int32(os.Getpid()))
// preallocate arrays in data, helps save on reallocations caused by append()
// when maxCount is large
data.BytesAllocated = make([]SimplePair, 0, maxCount)
data.GcPauses = make([]SimplePair, 0, maxCount)
data.CPUUsage = make([]CPUPair, 0, maxCount)
data.Pprof = make([]PprofPair, 0, maxCount)
go s.gatherData()
s.data.BytesAllocated = make([]SimplePair, 0, maxCount)
s.data.GcPauses = make([]SimplePair, 0, maxCount)
s.data.CPUUsage = make([]CPUPair, 0, maxCount)
s.data.Pprof = make([]PprofPair, 0, maxCount)
return s.TickTask.Start()
}
func (s *server) gatherData() {
timer := time.Tick(time.Second)
func (s *server) GetTickInterval() time.Duration {
return time.Second
}
for now := range timer {
nowUnix := now.Unix()
func (s *server) Tick(any) {
now := time.Now()
nowUnix := now.Unix()
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
u := update{
Ts: nowUnix * 1000,
Block: pprof.Lookup("block").Count(),
Goroutine: pprof.Lookup("goroutine").Count(),
Heap: pprof.Lookup("heap").Count(),
Mutex: pprof.Lookup("mutex").Count(),
Threadcreate: pprof.Lookup("threadcreate").Count(),
}
data.Pprof = append(data.Pprof, PprofPair{
uint64(nowUnix) * 1000,
u.Block,
u.Goroutine,
u.Heap,
u.Mutex,
u.Threadcreate,
})
cpuTimes, err := myProcess.Times()
if err != nil {
cpuTimes = &cpu.TimesStat{}
}
if prevUserTime != 0 {
u.CPUUser = cpuTimes.User - prevUserTime
u.CPUSys = cpuTimes.System - prevSysTime
data.CPUUsage = append(data.CPUUsage, CPUPair{uint64(nowUnix) * 1000, u.CPUUser, u.CPUSys})
}
prevUserTime = cpuTimes.User
prevSysTime = cpuTimes.System
mutex.Lock()
bytesAllocated := ms.Alloc
u.BytesAllocated = bytesAllocated
data.BytesAllocated = append(data.BytesAllocated, SimplePair{uint64(nowUnix) * 1000, bytesAllocated})
if lastPause == 0 || lastPause != ms.NumGC {
gcPause := ms.PauseNs[(ms.NumGC+255)%256]
u.GcPause = gcPause
data.GcPauses = append(data.GcPauses, SimplePair{uint64(nowUnix) * 1000, gcPause})
lastPause = ms.NumGC
}
if len(data.BytesAllocated) > maxCount {
data.BytesAllocated = data.BytesAllocated[len(data.BytesAllocated)-maxCount:]
}
if len(data.GcPauses) > maxCount {
data.GcPauses = data.GcPauses[len(data.GcPauses)-maxCount:]
}
mutex.Unlock()
s.sendToConsumers(u)
u := update{
Ts: nowUnix * 1000,
Block: pprof.Lookup("block").Count(),
Goroutine: pprof.Lookup("goroutine").Count(),
Heap: pprof.Lookup("heap").Count(),
Mutex: pprof.Lookup("mutex").Count(),
Threadcreate: pprof.Lookup("threadcreate").Count(),
}
s.data.Pprof = append(s.data.Pprof, PprofPair{
uint64(nowUnix) * 1000,
u.Block,
u.Goroutine,
u.Heap,
u.Mutex,
u.Threadcreate,
})
cpuTimes, err := s.myProcess.Times()
if err != nil {
cpuTimes = &cpu.TimesStat{}
}
if s.prevUserTime != 0 {
u.CPUUser = cpuTimes.User - s.prevUserTime
u.CPUSys = cpuTimes.System - s.prevSysTime
s.data.CPUUsage = append(s.data.CPUUsage, CPUPair{uint64(nowUnix) * 1000, u.CPUUser, u.CPUSys})
}
s.prevUserTime = cpuTimes.User
s.prevSysTime = cpuTimes.System
s.dataMutex.Lock()
bytesAllocated := ms.Alloc
u.BytesAllocated = bytesAllocated
s.data.BytesAllocated = append(s.data.BytesAllocated, SimplePair{uint64(nowUnix) * 1000, bytesAllocated})
if s.lastPause == 0 || s.lastPause != ms.NumGC {
gcPause := ms.PauseNs[(ms.NumGC+255)%256]
u.GcPause = gcPause
s.data.GcPauses = append(s.data.GcPauses, SimplePair{uint64(nowUnix) * 1000, gcPause})
s.lastPause = ms.NumGC
}
if len(s.data.BytesAllocated) > maxCount {
s.data.BytesAllocated = s.data.BytesAllocated[len(s.data.BytesAllocated)-maxCount:]
}
if len(s.data.GcPauses) > maxCount {
s.data.GcPauses = s.data.GcPauses[len(s.data.GcPauses)-maxCount:]
}
s.dataMutex.Unlock()
s.sendToConsumers(u)
}
func (s *server) sendToConsumers(u update) {
@@ -203,10 +205,10 @@ func (s *server) addConsumer() consumer {
s.consumersMutex.Lock()
defer s.consumersMutex.Unlock()
lastConsumerID++
s.lastConsumerID++
c := consumer{
id: lastConsumerID,
id: s.lastConsumerID,
c: make(chan update),
}
@@ -221,7 +223,7 @@ func (s *server) dataFeedHandler(w http.ResponseWriter, r *http.Request) {
lastPong time.Time
)
conn, err := upgrader.Upgrade(w, r, nil)
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
@@ -268,9 +270,9 @@ func (s *server) dataFeedHandler(w http.ResponseWriter, r *http.Request) {
}
}
func dataHandler(w http.ResponseWriter, r *http.Request) {
mutex.RLock()
defer mutex.RUnlock()
func (s *server) dataHandler(w http.ResponseWriter, r *http.Request) {
s.dataMutex.RLock()
defer s.dataMutex.RUnlock()
if e := r.ParseForm(); e != nil {
log.Print("error parsing form")
@@ -284,7 +286,7 @@ func dataHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)
encoder.Encode(data)
encoder.Encode(s.data)
fmt.Fprint(w, ")")
}

View File

@@ -34,13 +34,13 @@ type DebugPlugin struct {
m7s.Plugin
ProfileDuration time.Duration `default:"10s" desc:"profile持续时间"`
Profile string `desc:"采集profile存储文件"`
ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"`
Grfout string `default:"grf.out" desc:"grf输出文件"`
EnableChart bool `default:"true" desc:"是否启用图表功能"`
// 添加缓存字段
cpuProfileData *profile.Profile // 缓存 CPU Profile 数据
cpuProfileOnce sync.Once // 确保只采集一次
cpuProfileLock sync.Mutex // 保护缓存数据
chartServer server
}
type WriteToFile struct {
@@ -72,6 +72,9 @@ func (p *DebugPlugin) OnInit() error {
p.Info("cpu profile done")
}()
}
if p.EnableChart {
p.AddTask(&p.chartServer)
}
return nil
}
@@ -100,11 +103,11 @@ func (p *DebugPlugin) Charts_(w http.ResponseWriter, r *http.Request) {
}
func (p *DebugPlugin) Charts_data(w http.ResponseWriter, r *http.Request) {
dataHandler(w, r)
p.chartServer.dataHandler(w, r)
}
func (p *DebugPlugin) Charts_datafeed(w http.ResponseWriter, r *http.Request) {
s.dataFeedHandler(w, r)
p.chartServer.dataFeedHandler(w, r)
}
func (p *DebugPlugin) Grf(w http.ResponseWriter, r *http.Request) {

View File

@@ -2468,12 +2468,9 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus
resp.Message = fmt.Sprintf("发送暂停请求失败: %v", err)
return resp, nil
}
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Pause()
}
return nil
})
if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok {
s.Pause()
}
gb.Info("暂停回放",
"streampath", req.StreamPath)
@@ -2522,12 +2519,9 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes
resp.Message = fmt.Sprintf("发送恢复请求失败: %v", err)
return resp, nil
}
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Resume()
}
return nil
})
if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok {
s.Resume()
}
gb.Info("恢复回放",
"streampath", req.StreamPath)
@@ -2595,14 +2589,11 @@ func (gb *GB28181Plugin) PlaybackSpeed(ctx context.Context, req *pb.PlaybackSpee
// 发送请求
_, err := dialog.session.TransactionRequest(ctx, request)
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
return nil
})
if s, ok := gb.Server.Streams.SafeGet(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
if err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("发送倍速请求失败: %v", err)

View File

@@ -458,11 +458,8 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord)
filePath = req.FilePath
}
res = &mp4pb.ResponseStartRecord{}
p.Server.Records.Call(func() error {
_, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath
})
return nil
_, recordExists = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath
})
if recordExists {
err = pkg.ErrRecordExists
@@ -485,19 +482,16 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord)
func (p *MP4Plugin) StopRecord(ctx context.Context, req *mp4pb.ReqStopRecord) (res *mp4pb.ResponseStopRecord, err error) {
res = &mp4pb.ResponseStopRecord{}
var recordJob *m7s.RecordJob
p.Server.Records.Call(func() error {
recordJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath
})
if recordJob != nil {
t := recordJob.GetTask()
if t != nil {
res.Data = uint64(uintptr(unsafe.Pointer(t)))
t.Stop(task.ErrStopByUser)
}
}
return nil
recordJob, _ = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath
})
if recordJob != nil {
t := recordJob.GetTask()
if t != nil {
res.Data = uint64(uintptr(unsafe.Pointer(t)))
t.Stop(task.ErrStopByUser)
}
}
return
}
@@ -519,11 +513,8 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
}
//recorder := p.Meta.Recorder(config.Record{})
var tmpJob *m7s.RecordJob
p.Server.Records.Call(func() error {
tmpJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath
})
return nil
tmpJob, _ = p.Server.Records.SafeFind(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath
})
if tmpJob == nil { //为空表示没有正在进行的录制,也就是没有自动录像,则进行正常的事件录像
if stream, ok := p.Server.Streams.SafeGet(req.StreamPath); ok {

View File

@@ -79,10 +79,10 @@ func (nc *NetConnection) Handshake(checkC2 bool) (err error) {
if len(C1) != C1S1_SIZE {
return errors.New("C1 Error")
}
var ts int
util.GetBE(C1[4:8], &ts)
var zero int
util.GetBE(C1[4:8], &zero)
if ts == 0 {
if zero == 0 {
return nc.simple_handshake(C1, checkC2)
}
@@ -92,12 +92,26 @@ func (nc *NetConnection) Handshake(checkC2 bool) (err error) {
func (nc *NetConnection) ClientHandshake() (err error) {
C0C1 := nc.mediaDataPool.NextN(C1S1_SIZE + 1)
defer nc.mediaDataPool.Recycle()
// 构造 C0
C0C1[0] = RTMP_HANDSHAKE_VERSION
// 构造 C1 使用简单握手格式
C1 := C0C1[1:]
// Time (4 bytes): 当前时间戳
util.PutBE(C1[0:4], time.Now().Unix()&0xFFFFFFFF)
// Zero (4 bytes): 必须为 0确保使用简单握手
util.PutBE(C1[4:8], 0)
// Random data (1528 bytes): 填充随机数据
for i := 8; i < C1S1_SIZE; i++ {
C1[i] = byte(rand.Int() % 256)
}
if _, err = nc.Write(C0C1); err == nil {
// read S0 S1
if _, err = io.ReadFull(nc.Conn, C0C1); err == nil {
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
err = errors.New("S1 C1 Error")
err = errors.New("S0 Error")
// C2
} else if _, err = nc.Write(C0C1[1:]); err == nil {
_, err = io.ReadFull(nc.Conn, C0C1[1:]) // S2
@@ -222,13 +236,7 @@ func clientScheme(C1 []byte, schem int) (scheme int, challenge []byte, digest []
return 0, nil, nil, false, err
}
// ok
if bytes.Compare(digest, tmp_Hash) == 0 {
ok = true
} else {
ok = false
}
ok = bytes.Equal(digest, tmp_Hash)
// challenge scheme
challenge = C1[key_offset : key_offset+C1S1_KEY_DATA_SIZE]
scheme = schem

View File

@@ -76,13 +76,10 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.Net.ReceiveSpeed, prometheus.GaugeValue, float64(net.ReceiveSpeed), net.Name)
}
}
s.Call(func() error {
for stream := range s.Streams.Range {
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "video")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "video")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio")
}
return nil
})
for stream := range s.Streams.SafeRange {
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "video")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.VideoTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "video")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.BPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.BPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio")
ch <- prometheus.MustNewConstMetric(s.prometheusDesc.FPS, prometheus.GaugeValue, float64(stream.AudioTrack.AVTrack.FPS), stream.StreamPath, stream.Plugin.Meta.Name, "audio")
}
}