diff --git a/api.go b/api.go index 67df936..640758a 100644 --- a/api.go +++ b/api.go @@ -553,9 +553,10 @@ func (s *Server) GetDeviceList(ctx context.Context, req *emptypb.Empty) (res *pb Name: device.Name, CreateTime: timestamppb.New(device.CreatedAt), UpdateTime: timestamppb.New(device.UpdatedAt), - Type: uint32(device.Type), + Type: device.Type, PullURL: device.PullURL, ParentID: uint32(device.ParentID), + Status: uint32(device.Status), ID: uint32(device.ID), }) } @@ -566,7 +567,7 @@ func (s *Server) AddDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.Suc device := &Device{ server: s, Name: req.Name, - Type: byte(req.Type), + Type: req.Type, PullURL: req.PullURL, ParentID: uint(req.ParentID), } @@ -590,7 +591,7 @@ func (s *Server) UpdateDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb. target.Name = req.Name target.PullURL = req.PullURL target.ParentID = uint(req.ParentID) - target.Type = byte(req.Type) + target.Type = req.Type s.DB.Save(target) res = &pb.SuccessResponse{} return diff --git a/device.go b/device.go index 6d2c686..eb643dd 100644 --- a/device.go +++ b/device.go @@ -13,14 +13,6 @@ const ( DeviceStatusPulling ) -const ( - DeviceTypeGroup byte = iota - DeviceTypeGB - DeviceTypeRTSP - DeviceTypeRTMP - DeviceTypeWebRTC -) - type ( IDevice interface { Pull() @@ -32,7 +24,7 @@ type ( Name string PullURL string ParentID uint - Type byte + Type string Status byte Handler IDevice `gorm:"-:all"` } @@ -42,7 +34,7 @@ type ( ) func (d *Device) GetStreamPath() string { - return fmt.Sprintf("device/%d/%d", d.Type, d.ID) + return fmt.Sprintf("device/%s/%d", d.Type, d.ID) } func (d *Device) Start() (err error) { @@ -58,10 +50,16 @@ func (d *Device) Start() (err error) { } func (d *Device) ChangeStatus(status byte) { + if d.Status == status { + return + } + d.Info("device status changed", "from", d.Status, "to", status) d.Status = status d.Update() } func (d *Device) Update() { - d.server.DB.Save(d) + if d.server.DB != nil { + d.server.DB.Save(d) + } } diff --git a/example/default/device.yaml b/example/default/device.yaml index 4db63bf..5f715d9 100644 --- a/example/default/device.yaml +++ b/example/default/device.yaml @@ -1,7 +1,14 @@ global: + http: + listenaddr: :8081 + tcp: + listenaddr: :50052 device: - - name: default + - id: 1 + name: default description: Example device - stream: - - rtsp://xxx.xxx.xxx.xxx:554/live/test + pullurl: rtsp://localhost/live/test + +rtsp: + tcp: :8554 diff --git a/example/default/m7s.db-journal b/example/default/m7s.db-journal deleted file mode 100644 index 8fcd82f..0000000 Binary files a/example/default/m7s.db-journal and /dev/null differ diff --git a/example/default/readflv.yaml b/example/default/readflv.yaml index 85f1cfc..960b0f1 100644 --- a/example/default/readflv.yaml +++ b/example/default/readflv.yaml @@ -1,5 +1,10 @@ global: - loglevel: trace + # loglevel: trace + disableall: true +rtsp: + enable: true + listenaddr: :554 flv: + enable: true pull: - live/test: record/live/test/sei.flv \ No newline at end of file + live/test: /Users/dexter/Movies/jb-demo.flv \ No newline at end of file diff --git a/example/default/readmp4.yaml b/example/default/readmp4.yaml index d82fc16..1b0f4c5 100644 --- a/example/default/readmp4.yaml +++ b/example/default/readmp4.yaml @@ -1,6 +1,10 @@ global: - loglevel: trace + # loglevel: trace + disableall: true +rtsp: + enable: true + listenaddr: :554 mp4: + enable: true pull: - pullonstart: - live/test: /Users/dexter/Movies/test.mp4 \ No newline at end of file + live/test: /Users/dexter/Movies/test.mp4 \ No newline at end of file diff --git a/pb/global.pb.go b/pb/global.pb.go index ef43bc4..f0c35b6 100644 --- a/pb/global.pb.go +++ b/pb/global.pb.go @@ -2219,7 +2219,7 @@ type DeviceInfo struct { UpdateTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=updateTime,proto3" json:"updateTime,omitempty"` // 更新时间 ParentID uint32 `protobuf:"varint,4,opt,name=parentID,proto3" json:"parentID,omitempty"` // 父设备ID Name string `protobuf:"bytes,5,opt,name=name,proto3" json:"name,omitempty"` // 设备名称 - Type uint32 `protobuf:"varint,6,opt,name=type,proto3" json:"type,omitempty"` // 设备类型 + Type string `protobuf:"bytes,6,opt,name=type,proto3" json:"type,omitempty"` // 设备类型 Status uint32 `protobuf:"varint,7,opt,name=status,proto3" json:"status,omitempty"` // 设备状态 PullURL string `protobuf:"bytes,8,opt,name=pullURL,proto3" json:"pullURL,omitempty"` // 拉流地址 } @@ -2291,11 +2291,11 @@ func (x *DeviceInfo) GetName() string { return "" } -func (x *DeviceInfo) GetType() uint32 { +func (x *DeviceInfo) GetType() string { if x != nil { return x.Type } - return 0 + return "" } func (x *DeviceInfo) GetStatus() uint32 { @@ -2638,7 +2638,7 @@ var file_global_proto_rawDesc = []byte{ 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x32, 0xdb, 0x0f, 0x0a, 0x03, 0x61, diff --git a/pb/global.proto b/pb/global.proto index e95cd59..5ebe460 100644 --- a/pb/global.proto +++ b/pb/global.proto @@ -337,7 +337,7 @@ message DeviceInfo { google.protobuf.Timestamp updateTime = 3; // 更新时间 uint32 parentID = 4; // 父设备ID string name = 5; // 设备名称 - uint32 type = 6; // 设备类型 + string type = 6; // 设备类型 uint32 status = 7; // 设备状态 string pullURL = 8; // 拉流地址 } \ No newline at end of file diff --git a/pkg/task/job.go b/pkg/task/job.go index e811b1d..6cf37cc 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" "reflect" + "runtime/debug" "slices" "sync" "sync/atomic" @@ -155,12 +156,14 @@ func (mt *Job) Post(callback func() error, args ...any) *Task { func (mt *Job) run() { cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}} defer func() { - err := recover() - if err != nil { - if mt.Logger != nil { - mt.Logger.Error("job panic", "err", err) + if !ThrowPanic { + err := recover() + if err != nil { + if mt.Logger != nil { + mt.Logger.Error("job panic", "err", err, "stack", string(debug.Stack())) + } + mt.Stop(errors.Join(err.(error), ErrPanic)) } - mt.Stop(errors.Join(err.(error), ErrPanic)) } stopReason := mt.StopReason() for _, task := range mt.children { diff --git a/pkg/task/panic.go b/pkg/task/panic.go new file mode 100644 index 0000000..0216d01 --- /dev/null +++ b/pkg/task/panic.go @@ -0,0 +1,3 @@ +package task + +var ThrowPanic = false \ No newline at end of file diff --git a/pkg/task/panic_true.go b/pkg/task/panic_true.go new file mode 100644 index 0000000..bfb93dd --- /dev/null +++ b/pkg/task/panic_true.go @@ -0,0 +1,6 @@ +//go:build taskpanic +// +build taskpanic + +package task + +var ThrowPanic = true \ No newline at end of file diff --git a/pkg/task/task.go b/pkg/task/task.go index 1139b66..4027cbf 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -257,14 +257,16 @@ func (task *Task) checkRetry(err error) bool { func (task *Task) start() bool { var err error - defer func() { - if r := recover(); r != nil { - err = errors.New(fmt.Sprint(r)) - if task.Logger != nil { - task.Error("panic", "error", err, "stack", string(debug.Stack())) + if !ThrowPanic { + defer func() { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + if task.Logger != nil { + task.Error("panic", "error", err, "stack", string(debug.Stack())) + } } - } - }() + }() + } for { task.StartTime = time.Now() if task.Logger != nil { diff --git a/plugin.go b/plugin.go index 13cafb4..d081ab3 100644 --- a/plugin.go +++ b/plugin.go @@ -425,7 +425,9 @@ func (p *Plugin) OnSubscribe(sub *Subscriber) { } } for device := range p.Server.Devices.Range { - device.Handler.Pull() + if device.Status == DeviceStatusOnline && device.GetStreamPath() == sub.StreamPath { + device.Handler.Pull() + } } //if !avoidTrans { // for reg, conf := range plugin.GetCommonConf().OnSub.Transform { diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index b54a5c4..b0ab2b4 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -104,7 +104,7 @@ func (gb *GB28181Plugin) OnInit() (err error) { } func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) (ret task.ITask) { - if device.Type != m7s.DeviceTypeGB { + if device.Type != "gb28181" { return } deviceID, channelID, _ := strings.Cut(device.PullURL, "/") @@ -323,7 +323,7 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) { gb.devices.Add(d) d.channels.OnAdd(func(c *Channel) { if absDevice, ok := gb.Server.Devices.Find(func(absDevice *m7s.Device) bool { - return absDevice.Type == m7s.DeviceTypeGB && absDevice.PullURL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID) + return absDevice.Type == "gb28181" && absDevice.PullURL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID) }); ok { c.AbstractDevice = absDevice absDevice.Handler = c diff --git a/plugin/rtsp/device.go b/plugin/rtsp/device.go index 0a9f25d..603ff4f 100644 --- a/plugin/rtsp/device.go +++ b/plugin/rtsp/device.go @@ -6,6 +6,7 @@ import ( "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/task" + "m7s.live/m7s/v5/pkg/util" . "m7s.live/m7s/v5/plugin/rtsp/pkg" ) @@ -17,12 +18,11 @@ type RTSPDevice struct { } func (d *RTSPDevice) Start() (err error) { - d.conn.NetConnection = new(NetConnection) - err = d.conn.Connect(d.device.PullURL) - if err != nil { - return + d.conn.NetConnection = &NetConnection{ + MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12), + UserAgent: "monibuca" + m7s.Version, } - d.device.ChangeStatus(m7s.DeviceStatusOnline) + d.conn.Logger = d.plugin.Logger return d.TickTask.Start() } @@ -31,13 +31,20 @@ func (d *RTSPDevice) GetTickInterval() time.Duration { } func (d *RTSPDevice) Pull() { - d.plugin.Pull(d.device.GetStreamPath(), config.Pull{URL: d.device.PullURL,MaxRetry: -1}) + d.plugin.Pull(d.device.GetStreamPath(), config.Pull{URL: d.device.PullURL, MaxRetry: -1, RetryInterval: time.Second * 5}) } func (d *RTSPDevice) Tick(any) { + if d.device.Status != m7s.DeviceStatusOnline { + err := d.conn.Connect(d.device.PullURL) + if err != nil { + return + } + d.device.ChangeStatus(m7s.DeviceStatusOnline) + } err := d.conn.Options() if err != nil { - d.Stop(err) + d.device.ChangeStatus(m7s.DeviceStatusOffline) } } diff --git a/plugin/rtsp/index.go b/plugin/rtsp/index.go index e9236fc..b4c5558 100644 --- a/plugin/rtsp/index.go +++ b/plugin/rtsp/index.go @@ -25,7 +25,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask { } func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) task.ITask { - if device.Type != m7s.DeviceTypeRTSP { + if device.Type != "rtsp" { return nil } ret := &RTSPDevice{device: device, plugin: p} diff --git a/plugin/rtsp/pkg/client.go b/plugin/rtsp/pkg/client.go index 26b6cf1..e9de020 100644 --- a/plugin/rtsp/pkg/client.go +++ b/plugin/rtsp/pkg/client.go @@ -53,7 +53,6 @@ func NewPusher() m7s.IPusher { } func (c *Client) Run() (err error) { - c.BufReader = util.NewBufReader(c.conn) c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12) if err = c.Options(); err != nil { return diff --git a/plugin/rtsp/pkg/connection.go b/plugin/rtsp/pkg/connection.go index 6f834c1..384dc0a 100644 --- a/plugin/rtsp/pkg/connection.go +++ b/plugin/rtsp/pkg/connection.go @@ -135,6 +135,7 @@ func (c *NetConnection) Connect(remoteURL string) (err error) { return } c.conn = conn + c.BufReader = util.NewBufReader(conn) c.URL = rtspURL c.UserAgent = "monibuca" + m7s.Version c.auth = util.NewAuth(c.URL.User) diff --git a/plugin/srt/index.go b/plugin/srt/index.go index 2a3b347..e079fee 100644 --- a/plugin/srt/index.go +++ b/plugin/srt/index.go @@ -21,7 +21,9 @@ type SRTPlugin struct { Passphrase string } -var _ = m7s.InstallPlugin[SRTPlugin](pkg.NewPuller, pkg.NewPusher) +const defaultConfig = m7s.DefaultYaml(`listenaddr: :6000`) + +var _ = m7s.InstallPlugin[SRTPlugin](defaultConfig,pkg.NewPuller, pkg.NewPusher) func (p *SRTPlugin) OnInit() error { var t SRTServer diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..b204fb1 --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,4 @@ +# use protoc to generate the go code from the proto file + +1. cd to plugin/xxx +2. sh ../../scripts/protoc.sh \ No newline at end of file diff --git a/server.go b/server.go index d045624..68cf69a 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "net/http" + "net/url" "os" "path/filepath" "runtime/debug" @@ -59,7 +60,7 @@ type ( ID uint ParentID uint Name string - Type byte + Type string PullURL string } } @@ -284,10 +285,7 @@ func (s *Server) Start() (err error) { s.Streams.OnStart(func() { s.Streams.AddTask(&CheckSubWaitTimeout{s: s}) }) - s.Transforms.OnStart(func() { - publishEvent := &TransformsPublishEvent{Transforms: &s.Transforms} - s.Transforms.AddTask(publishEvent) - }) + s.Transforms.AddTask(&TransformsPublishEvent{Transforms: &s.Transforms}) s.Info("server started") s.Post(func() error { for plugin := range s.Plugins.Range { @@ -309,10 +307,34 @@ func (s *Server) Start() (err error) { d.ParentID = device.ParentID d.server = s d.Type = device.Type + if d.Type == "" { + if strings.HasPrefix(d.PullURL, "srt://") { + d.Type = "srt" + } else if strings.HasPrefix(d.PullURL, "rtsp://") { + d.Type = "rtsp" + } else if strings.HasPrefix(d.PullURL, "rtmp://") { + d.Type = "rtmp" + } else if strings.HasPrefix(d.PullURL, "srt://") { + d.Type = "srt" + } else { + u, err := url.Parse(d.PullURL) + if err != nil { + s.Error("parse pull url failed", "error", err) + continue + } + if strings.HasSuffix(u.Path, ".m3u8") { + d.Type = "hls" + } else if strings.HasSuffix(u.Path, ".flv") { + d.Type = "flv" + } else if strings.HasSuffix(u.Path, ".mp4") { + d.Type = "mp4" + } + } + } if s.DB != nil { s.DB.Save(&d) } else { - s.Devices.Add(&d) + s.Devices.Add(&d, s.Logger.With("device", device.ID, "type", device.Type, "name", device.Name)) } } } @@ -321,7 +343,7 @@ func (s *Server) Start() (err error) { s.DB.Find(&devices) for _, device := range devices { device.server = s - s.Devices.Add(device) + s.Devices.Add(device, s.Logger.With("device", device.ID, "type", device.Type, "name", device.Name)) } } return nil diff --git a/subscriber.go b/subscriber.go index a196389..ea4180c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -23,6 +23,7 @@ import ( var AVFrameType = reflect.TypeOf((*AVFrame)(nil)) var Owner task.TaskContextKey = "owner" + type PubSubBase struct { task.Job Plugin *Plugin @@ -321,10 +322,14 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) { } checkPublisherChange := func() { if prePublisher != s.Publisher { - if s.Publisher == nil { - s.Info("publisher gone", "prePublisher", prePublisher.ID) + if prePublisher != nil { + if s.Publisher == nil { + s.Info("publisher gone", "prePublisher", prePublisher.ID) + } else { + s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) + } } else { - s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) + s.Info("publisher recover", "publisher", s.Publisher.ID) } if s.AudioReader != nil { startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond