fix: pull publisher block

This commit is contained in:
langhuihui
2024-10-04 17:44:47 +08:00
parent f101d34de0
commit c57200178d
22 changed files with 133 additions and 62 deletions

7
api.go
View File

@@ -553,9 +553,10 @@ func (s *Server) GetDeviceList(ctx context.Context, req *emptypb.Empty) (res *pb
Name: device.Name, Name: device.Name,
CreateTime: timestamppb.New(device.CreatedAt), CreateTime: timestamppb.New(device.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt), UpdateTime: timestamppb.New(device.UpdatedAt),
Type: uint32(device.Type), Type: device.Type,
PullURL: device.PullURL, PullURL: device.PullURL,
ParentID: uint32(device.ParentID), ParentID: uint32(device.ParentID),
Status: uint32(device.Status),
ID: uint32(device.ID), ID: uint32(device.ID),
}) })
} }
@@ -566,7 +567,7 @@ func (s *Server) AddDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.Suc
device := &Device{ device := &Device{
server: s, server: s,
Name: req.Name, Name: req.Name,
Type: byte(req.Type), Type: req.Type,
PullURL: req.PullURL, PullURL: req.PullURL,
ParentID: uint(req.ParentID), ParentID: uint(req.ParentID),
} }
@@ -590,7 +591,7 @@ func (s *Server) UpdateDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.
target.Name = req.Name target.Name = req.Name
target.PullURL = req.PullURL target.PullURL = req.PullURL
target.ParentID = uint(req.ParentID) target.ParentID = uint(req.ParentID)
target.Type = byte(req.Type) target.Type = req.Type
s.DB.Save(target) s.DB.Save(target)
res = &pb.SuccessResponse{} res = &pb.SuccessResponse{}
return return

View File

@@ -13,14 +13,6 @@ const (
DeviceStatusPulling DeviceStatusPulling
) )
const (
DeviceTypeGroup byte = iota
DeviceTypeGB
DeviceTypeRTSP
DeviceTypeRTMP
DeviceTypeWebRTC
)
type ( type (
IDevice interface { IDevice interface {
Pull() Pull()
@@ -32,7 +24,7 @@ type (
Name string Name string
PullURL string PullURL string
ParentID uint ParentID uint
Type byte Type string
Status byte Status byte
Handler IDevice `gorm:"-:all"` Handler IDevice `gorm:"-:all"`
} }
@@ -42,7 +34,7 @@ type (
) )
func (d *Device) GetStreamPath() string { func (d *Device) GetStreamPath() string {
return fmt.Sprintf("device/%d/%d", d.Type, d.ID) return fmt.Sprintf("device/%s/%d", d.Type, d.ID)
} }
func (d *Device) Start() (err error) { func (d *Device) Start() (err error) {
@@ -58,10 +50,16 @@ func (d *Device) Start() (err error) {
} }
func (d *Device) ChangeStatus(status byte) { func (d *Device) ChangeStatus(status byte) {
if d.Status == status {
return
}
d.Info("device status changed", "from", d.Status, "to", status)
d.Status = status d.Status = status
d.Update() d.Update()
} }
func (d *Device) Update() { func (d *Device) Update() {
d.server.DB.Save(d) if d.server.DB != nil {
d.server.DB.Save(d)
}
} }

View File

@@ -1,7 +1,14 @@
global: global:
http:
listenaddr: :8081
tcp:
listenaddr: :50052
device: device:
- name: default - id: 1
name: default
description: Example device description: Example device
stream: pullurl: rtsp://localhost/live/test
- rtsp://xxx.xxx.xxx.xxx:554/live/test
rtsp:
tcp: :8554

Binary file not shown.

View File

@@ -1,5 +1,10 @@
global: global:
loglevel: trace # loglevel: trace
disableall: true
rtsp:
enable: true
listenaddr: :554
flv: flv:
enable: true
pull: pull:
live/test: record/live/test/sei.flv live/test: /Users/dexter/Movies/jb-demo.flv

View File

@@ -1,6 +1,10 @@
global: global:
loglevel: trace # loglevel: trace
disableall: true
rtsp:
enable: true
listenaddr: :554
mp4: mp4:
enable: true
pull: pull:
pullonstart: live/test: /Users/dexter/Movies/test.mp4
live/test: /Users/dexter/Movies/test.mp4

View File

@@ -2219,7 +2219,7 @@ type DeviceInfo struct {
UpdateTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=updateTime,proto3" json:"updateTime,omitempty"` // 更新时间 UpdateTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=updateTime,proto3" json:"updateTime,omitempty"` // 更新时间
ParentID uint32 `protobuf:"varint,4,opt,name=parentID,proto3" json:"parentID,omitempty"` // 父设备ID ParentID uint32 `protobuf:"varint,4,opt,name=parentID,proto3" json:"parentID,omitempty"` // 父设备ID
Name string `protobuf:"bytes,5,opt,name=name,proto3" json:"name,omitempty"` // 设备名称 Name string `protobuf:"bytes,5,opt,name=name,proto3" json:"name,omitempty"` // 设备名称
Type uint32 `protobuf:"varint,6,opt,name=type,proto3" json:"type,omitempty"` // 设备类型 Type string `protobuf:"bytes,6,opt,name=type,proto3" json:"type,omitempty"` // 设备类型
Status uint32 `protobuf:"varint,7,opt,name=status,proto3" json:"status,omitempty"` // 设备状态 Status uint32 `protobuf:"varint,7,opt,name=status,proto3" json:"status,omitempty"` // 设备状态
PullURL string `protobuf:"bytes,8,opt,name=pullURL,proto3" json:"pullURL,omitempty"` // 拉流地址 PullURL string `protobuf:"bytes,8,opt,name=pullURL,proto3" json:"pullURL,omitempty"` // 拉流地址
} }
@@ -2291,11 +2291,11 @@ func (x *DeviceInfo) GetName() string {
return "" return ""
} }
func (x *DeviceInfo) GetType() uint32 { func (x *DeviceInfo) GetType() string {
if x != nil { if x != nil {
return x.Type return x.Type
} }
return 0 return ""
} }
func (x *DeviceInfo) GetStatus() uint32 { func (x *DeviceInfo) GetStatus() uint32 {
@@ -2638,7 +2638,7 @@ var file_global_proto_rawDesc = []byte{
0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x44,
0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74,
0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
0x12, 0x18, 0x0a, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x18, 0x08, 0x20, 0x01, 0x28, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x18, 0x08, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x32, 0xdb, 0x0f, 0x0a, 0x03, 0x61, 0x09, 0x52, 0x07, 0x70, 0x75, 0x6c, 0x6c, 0x55, 0x52, 0x4c, 0x32, 0xdb, 0x0f, 0x0a, 0x03, 0x61,

View File

@@ -337,7 +337,7 @@ message DeviceInfo {
google.protobuf.Timestamp updateTime = 3; // 更新时间 google.protobuf.Timestamp updateTime = 3; // 更新时间
uint32 parentID = 4; // 父设备ID uint32 parentID = 4; // 父设备ID
string name = 5; // 设备名称 string name = 5; // 设备名称
uint32 type = 6; // 设备类型 string type = 6; // 设备类型
uint32 status = 7; // 设备状态 uint32 status = 7; // 设备状态
string pullURL = 8; // 拉流地址 string pullURL = 8; // 拉流地址
} }

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"log/slog" "log/slog"
"reflect" "reflect"
"runtime/debug"
"slices" "slices"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -155,12 +156,14 @@ func (mt *Job) Post(callback func() error, args ...any) *Task {
func (mt *Job) run() { func (mt *Job) run() {
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}} cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(mt.addSub)}}
defer func() { defer func() {
err := recover() if !ThrowPanic {
if err != nil { err := recover()
if mt.Logger != nil { if err != nil {
mt.Logger.Error("job panic", "err", err) if mt.Logger != nil {
mt.Logger.Error("job panic", "err", err, "stack", string(debug.Stack()))
}
mt.Stop(errors.Join(err.(error), ErrPanic))
} }
mt.Stop(errors.Join(err.(error), ErrPanic))
} }
stopReason := mt.StopReason() stopReason := mt.StopReason()
for _, task := range mt.children { for _, task := range mt.children {

3
pkg/task/panic.go Normal file
View File

@@ -0,0 +1,3 @@
package task
var ThrowPanic = false

6
pkg/task/panic_true.go Normal file
View File

@@ -0,0 +1,6 @@
//go:build taskpanic
// +build taskpanic
package task
var ThrowPanic = true

View File

@@ -257,14 +257,16 @@ func (task *Task) checkRetry(err error) bool {
func (task *Task) start() bool { func (task *Task) start() bool {
var err error var err error
defer func() { if !ThrowPanic {
if r := recover(); r != nil { defer func() {
err = errors.New(fmt.Sprint(r)) if r := recover(); r != nil {
if task.Logger != nil { err = errors.New(fmt.Sprint(r))
task.Error("panic", "error", err, "stack", string(debug.Stack())) if task.Logger != nil {
task.Error("panic", "error", err, "stack", string(debug.Stack()))
}
} }
} }()
}() }
for { for {
task.StartTime = time.Now() task.StartTime = time.Now()
if task.Logger != nil { if task.Logger != nil {

View File

@@ -425,7 +425,9 @@ func (p *Plugin) OnSubscribe(sub *Subscriber) {
} }
} }
for device := range p.Server.Devices.Range { for device := range p.Server.Devices.Range {
device.Handler.Pull() if device.Status == DeviceStatusOnline && device.GetStreamPath() == sub.StreamPath {
device.Handler.Pull()
}
} }
//if !avoidTrans { //if !avoidTrans {
// for reg, conf := range plugin.GetCommonConf().OnSub.Transform { // for reg, conf := range plugin.GetCommonConf().OnSub.Transform {

View File

@@ -104,7 +104,7 @@ func (gb *GB28181Plugin) OnInit() (err error) {
} }
func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) (ret task.ITask) { func (p *GB28181Plugin) OnDeviceAdd(device *m7s.Device) (ret task.ITask) {
if device.Type != m7s.DeviceTypeGB { if device.Type != "gb28181" {
return return
} }
deviceID, channelID, _ := strings.Cut(device.PullURL, "/") deviceID, channelID, _ := strings.Cut(device.PullURL, "/")
@@ -323,7 +323,7 @@ func (gb *GB28181Plugin) StoreDevice(id string, req *sip.Request) (d *Device) {
gb.devices.Add(d) gb.devices.Add(d)
d.channels.OnAdd(func(c *Channel) { d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.Devices.Find(func(absDevice *m7s.Device) bool { if absDevice, ok := gb.Server.Devices.Find(func(absDevice *m7s.Device) bool {
return absDevice.Type == m7s.DeviceTypeGB && absDevice.PullURL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID) return absDevice.Type == "gb28181" && absDevice.PullURL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID)
}); ok { }); ok {
c.AbstractDevice = absDevice c.AbstractDevice = absDevice
absDevice.Handler = c absDevice.Handler = c

View File

@@ -6,6 +6,7 @@ import (
"m7s.live/m7s/v5" "m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
. "m7s.live/m7s/v5/plugin/rtsp/pkg" . "m7s.live/m7s/v5/plugin/rtsp/pkg"
) )
@@ -17,12 +18,11 @@ type RTSPDevice struct {
} }
func (d *RTSPDevice) Start() (err error) { func (d *RTSPDevice) Start() (err error) {
d.conn.NetConnection = new(NetConnection) d.conn.NetConnection = &NetConnection{
err = d.conn.Connect(d.device.PullURL) MemoryAllocator: util.NewScalableMemoryAllocator(1 << 12),
if err != nil { UserAgent: "monibuca" + m7s.Version,
return
} }
d.device.ChangeStatus(m7s.DeviceStatusOnline) d.conn.Logger = d.plugin.Logger
return d.TickTask.Start() return d.TickTask.Start()
} }
@@ -31,13 +31,20 @@ func (d *RTSPDevice) GetTickInterval() time.Duration {
} }
func (d *RTSPDevice) Pull() { func (d *RTSPDevice) Pull() {
d.plugin.Pull(d.device.GetStreamPath(), config.Pull{URL: d.device.PullURL,MaxRetry: -1}) d.plugin.Pull(d.device.GetStreamPath(), config.Pull{URL: d.device.PullURL, MaxRetry: -1, RetryInterval: time.Second * 5})
} }
func (d *RTSPDevice) Tick(any) { func (d *RTSPDevice) Tick(any) {
if d.device.Status != m7s.DeviceStatusOnline {
err := d.conn.Connect(d.device.PullURL)
if err != nil {
return
}
d.device.ChangeStatus(m7s.DeviceStatusOnline)
}
err := d.conn.Options() err := d.conn.Options()
if err != nil { if err != nil {
d.Stop(err) d.device.ChangeStatus(m7s.DeviceStatusOffline)
} }
} }

View File

@@ -25,7 +25,7 @@ func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
} }
func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) task.ITask { func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) task.ITask {
if device.Type != m7s.DeviceTypeRTSP { if device.Type != "rtsp" {
return nil return nil
} }
ret := &RTSPDevice{device: device, plugin: p} ret := &RTSPDevice{device: device, plugin: p}

View File

@@ -53,7 +53,6 @@ func NewPusher() m7s.IPusher {
} }
func (c *Client) Run() (err error) { func (c *Client) Run() (err error) {
c.BufReader = util.NewBufReader(c.conn)
c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12) c.MemoryAllocator = util.NewScalableMemoryAllocator(1 << 12)
if err = c.Options(); err != nil { if err = c.Options(); err != nil {
return return

View File

@@ -135,6 +135,7 @@ func (c *NetConnection) Connect(remoteURL string) (err error) {
return return
} }
c.conn = conn c.conn = conn
c.BufReader = util.NewBufReader(conn)
c.URL = rtspURL c.URL = rtspURL
c.UserAgent = "monibuca" + m7s.Version c.UserAgent = "monibuca" + m7s.Version
c.auth = util.NewAuth(c.URL.User) c.auth = util.NewAuth(c.URL.User)

View File

@@ -21,7 +21,9 @@ type SRTPlugin struct {
Passphrase string Passphrase string
} }
var _ = m7s.InstallPlugin[SRTPlugin](pkg.NewPuller, pkg.NewPusher) const defaultConfig = m7s.DefaultYaml(`listenaddr: :6000`)
var _ = m7s.InstallPlugin[SRTPlugin](defaultConfig,pkg.NewPuller, pkg.NewPusher)
func (p *SRTPlugin) OnInit() error { func (p *SRTPlugin) OnInit() error {
var t SRTServer var t SRTServer

4
scripts/README.md Normal file
View File

@@ -0,0 +1,4 @@
# use protoc to generate the go code from the proto file
1. cd to plugin/xxx
2. sh ../../scripts/protoc.sh

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"runtime/debug" "runtime/debug"
@@ -59,7 +60,7 @@ type (
ID uint ID uint
ParentID uint ParentID uint
Name string Name string
Type byte Type string
PullURL string PullURL string
} }
} }
@@ -284,10 +285,7 @@ func (s *Server) Start() (err error) {
s.Streams.OnStart(func() { s.Streams.OnStart(func() {
s.Streams.AddTask(&CheckSubWaitTimeout{s: s}) s.Streams.AddTask(&CheckSubWaitTimeout{s: s})
}) })
s.Transforms.OnStart(func() { s.Transforms.AddTask(&TransformsPublishEvent{Transforms: &s.Transforms})
publishEvent := &TransformsPublishEvent{Transforms: &s.Transforms}
s.Transforms.AddTask(publishEvent)
})
s.Info("server started") s.Info("server started")
s.Post(func() error { s.Post(func() error {
for plugin := range s.Plugins.Range { for plugin := range s.Plugins.Range {
@@ -309,10 +307,34 @@ func (s *Server) Start() (err error) {
d.ParentID = device.ParentID d.ParentID = device.ParentID
d.server = s d.server = s
d.Type = device.Type d.Type = device.Type
if d.Type == "" {
if strings.HasPrefix(d.PullURL, "srt://") {
d.Type = "srt"
} else if strings.HasPrefix(d.PullURL, "rtsp://") {
d.Type = "rtsp"
} else if strings.HasPrefix(d.PullURL, "rtmp://") {
d.Type = "rtmp"
} else if strings.HasPrefix(d.PullURL, "srt://") {
d.Type = "srt"
} else {
u, err := url.Parse(d.PullURL)
if err != nil {
s.Error("parse pull url failed", "error", err)
continue
}
if strings.HasSuffix(u.Path, ".m3u8") {
d.Type = "hls"
} else if strings.HasSuffix(u.Path, ".flv") {
d.Type = "flv"
} else if strings.HasSuffix(u.Path, ".mp4") {
d.Type = "mp4"
}
}
}
if s.DB != nil { if s.DB != nil {
s.DB.Save(&d) s.DB.Save(&d)
} else { } else {
s.Devices.Add(&d) s.Devices.Add(&d, s.Logger.With("device", device.ID, "type", device.Type, "name", device.Name))
} }
} }
} }
@@ -321,7 +343,7 @@ func (s *Server) Start() (err error) {
s.DB.Find(&devices) s.DB.Find(&devices)
for _, device := range devices { for _, device := range devices {
device.server = s device.server = s
s.Devices.Add(device) s.Devices.Add(device, s.Logger.With("device", device.ID, "type", device.Type, "name", device.Name))
} }
} }
return nil return nil

View File

@@ -23,6 +23,7 @@ import (
var AVFrameType = reflect.TypeOf((*AVFrame)(nil)) var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
var Owner task.TaskContextKey = "owner" var Owner task.TaskContextKey = "owner"
type PubSubBase struct { type PubSubBase struct {
task.Job task.Job
Plugin *Plugin Plugin *Plugin
@@ -321,10 +322,14 @@ func (handler *SubscribeHandler[A, V]) Start() (err error) {
} }
checkPublisherChange := func() { checkPublisherChange := func() {
if prePublisher != s.Publisher { if prePublisher != s.Publisher {
if s.Publisher == nil { if prePublisher != nil {
s.Info("publisher gone", "prePublisher", prePublisher.ID) if s.Publisher == nil {
s.Info("publisher gone", "prePublisher", prePublisher.ID)
} else {
s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID)
}
} else { } else {
s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) s.Info("publisher recover", "publisher", s.Publisher.ID)
} }
if s.AudioReader != nil { if s.AudioReader != nil {
startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond