diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2f6747b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# Compile Stage +FROM golang:1.23.1-bullseye AS builder + + +LABEL stage=gobuilder + +# Env +ENV CGO_ENABLE 0 +ENV GOOS linux +ENV GOARCH amd64 +#ENV GOPROXY https://goproxy.cn,direct +ENV HOME /monibuca + +WORKDIR / + +RUN git clone -b v5 --depth 1 https://github.com/langhuihui/monibuca + +# compile +WORKDIR /monibuca +RUN go build -tags sqlite -o ./build/monibuca ./example/default/main.go + +RUN cp -r /monibuca/example/default/config.yaml /monibuca/build + +# Running Stage +FROM alpine:latest + +WORKDIR /monibuca +COPY --from=builder /monibuca/build /monibuca/ +RUN cp -r ./config.yaml /etc/monibuca +# Export necessary ports +EXPOSE 8080 8443 1935 554 5060 9000-20000 +EXPOSE 5060/udp + +CMD [ "./monibuca", "-c", "/etc/monibuca/config.yaml" ] diff --git a/example/default/main.go b/example/default/main.go index 80b4480..02c5e4a 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -17,6 +17,7 @@ import ( _ "m7s.live/m7s/v5/plugin/stress" _ "m7s.live/m7s/v5/plugin/transcode" _ "m7s.live/m7s/v5/plugin/webrtc" + _ "m7s.live/m7s/v5/plugin/sei" ) func main() { diff --git a/pkg/config/types.go b/pkg/config/types.go index 375ba49..bf7d6fc 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -1,9 +1,10 @@ package config import ( - "m7s.live/m7s/v5/pkg/util" "net/url" "time" + + "m7s.live/m7s/v5/pkg/util" ) type ( @@ -56,13 +57,14 @@ type ( Fragment time.Duration `desc:"分片时长"` // 分片时长 Append bool `desc:"是否追加录制"` // 是否追加录制 } + TransfromOutput struct { + Target string `desc:"转码目标"` // 转码目标 + StreamPath string + Conf any + } Transform struct { Input any - Output []struct { - Target string `desc:"转码目标"` // 转码目标 - StreamPath string - Conf any - } + Output []TransfromOutput } OnPublish struct { Push map[Regexp]Push diff --git a/pkg/raw.go b/pkg/raw.go index ae49746..581ec3c 100644 --- a/pkg/raw.go +++ b/pkg/raw.go @@ -57,6 +57,7 @@ func (r *RawAudio) Demux(ctx codec.ICodecCtx) (any, error) { func (r *RawAudio) Mux(ctx codec.ICodecCtx, frame *AVFrame) { r.InitRecycleIndexes(0) + r.FourCC = ctx.FourCC() r.Memory = frame.Raw.(util.Memory) r.Timestamp = frame.Timestamp } @@ -158,6 +159,7 @@ func (h *H26xFrame) Demux(ctx codec.ICodecCtx) (any, error) { } func (h *H26xFrame) Mux(ctx codec.ICodecCtx, frame *AVFrame) { + h.FourCC = ctx.FourCC() h.Nalus = frame.Raw.(Nalus) h.Timestamp = frame.Timestamp h.CTS = frame.CTS diff --git a/pkg/task/job.go b/pkg/task/job.go index 1dbd5ca..9e93951 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -79,23 +79,8 @@ func (mt *Job) RangeSubTask(callback func(task ITask) bool) { } } -func (mt *Job) AddTaskLazy(t IJob) { - task := t.GetTask() - task.parent = mt - task.handler = t -} - func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { - mt.lazyRun.Do(func() { - if mt.parent != nil && mt.Context == nil { - mt.parent.AddTask(mt.handler) //from lazy - } - mt.childrenDisposed = make(chan struct{}) - mt.addSub = make(chan ITask, 10) - go mt.run() - }) - if task = t.GetTask(); task.Context == nil { - task.parentCtx = mt.Context + if task = t.GetTask(); t != task.handler { // first add for _, o := range opt { switch v := o.(type) { case context.Context: @@ -108,10 +93,29 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) { task.Logger = v } } - if task.parentCtx == nil { - panic("context is nil") - } task.parent = mt + task.handler = t + switch t.(type) { + case TaskStarter, TaskBlock, TaskGo: + // need start now + default: + // lazy start + return + } + } + + mt.lazyRun.Do(func() { + if mt.parent != nil && mt.Context == nil { + mt.parent.AddTask(mt.handler) // second add, lazy start + } + mt.childrenDisposed = make(chan struct{}) + mt.addSub = make(chan ITask, 10) + go mt.run() + }) + if task.Context == nil { + if task.parentCtx == nil { + task.parentCtx = mt.Context + } task.level = mt.level + 1 if task.ID == 0 { task.ID = GetNextTaskID() diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index aefb47f..ec4e8d2 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -23,7 +23,7 @@ type WriteFlvMetaTagQueueTask struct { var writeMetaTagQueueTask WriteFlvMetaTagQueueTask func init() { - m7s.Servers.AddTaskLazy(&writeMetaTagQueueTask) + m7s.Servers.AddTask(&writeMetaTagQueueTask) } type writeMetaTagTask struct { diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 7fe83a0..a119acb 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -52,7 +52,7 @@ func (task *writeTrailerTask) Start() (err error) { } func init() { - m7s.Servers.AddTaskLazy(&writeTrailerQueueTask) + m7s.Servers.AddTask(&writeTrailerQueueTask) } func NewRecorder() m7s.IRecorder { diff --git a/plugin/sei/index.go b/plugin/sei/index.go new file mode 100644 index 0000000..ce495e1 --- /dev/null +++ b/plugin/sei/index.go @@ -0,0 +1,73 @@ +package plugin_sei + +import ( + "io" + "net/http" + "strconv" + + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/config" + "m7s.live/m7s/v5/pkg/util" + sei "m7s.live/m7s/v5/plugin/sei/pkg" +) + +var _ = m7s.InstallPlugin[SEIPlugin](sei.NewTransform) + +type SEIPlugin struct { + m7s.Plugin +} + +func (conf *SEIPlugin) RegisterHandler() map[string]http.HandlerFunc { + return map[string]http.HandlerFunc{ + "/api/insert/{streamPath...}": conf.api_insert, + } +} + +func (conf *SEIPlugin) api_insert(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + streamPath := r.PathValue("streamPath") + targetStreamPath := q.Get("targetStreamPath") + if targetStreamPath == "" { + targetStreamPath = streamPath + "/sei" + } + ok := conf.Server.Streams.Has(streamPath) + if !ok { + util.ReturnError(util.APIErrorNoStream, streamPath+" not found", w, r) + return + } + var transformer *sei.Transformer + if tm, ok := conf.Server.Transforms.Transformed.Get(targetStreamPath); ok { + transformer, ok = tm.TransformJob.Transformer.(*sei.Transformer) + if !ok { + util.ReturnError(util.APIErrorPublish, "targetStreamPath is not a sei transformer", w, r) + return + } + } else { + transformer = sei.NewTransform().(*sei.Transformer) + conf.Transform(streamPath, config.Transform{ + Output: []config.TransfromOutput{ + { + Target: targetStreamPath, + StreamPath: targetStreamPath, + }, + }, + }) + } + t := q.Get("type") + tb, err := strconv.ParseInt(t, 10, 8) + if err != nil { + if t == "" { + tb = 5 + } else { + util.ReturnError(util.APIErrorQueryParse, "type must a number", w, r) + return + } + } + sei, err := io.ReadAll(r.Body) + if err != nil { + util.ReturnError(util.APIErrorNoBody, err.Error(), w, r) + return + } + transformer.AddSEI(byte(tb), sei) + util.ReturnOK(w, r) +} diff --git a/plugin/sei/pkg/transform.go b/plugin/sei/pkg/transform.go new file mode 100644 index 0000000..2114e5f --- /dev/null +++ b/plugin/sei/pkg/transform.go @@ -0,0 +1,90 @@ +package sei + +import ( + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" +) + +type Transformer struct { + m7s.DefaultTransformer + data chan util.Buffer + allocator *util.ScalableMemoryAllocator +} + +func (t *Transformer) AddSEI(tp byte, data []byte) { + l := len(data) + var buffer util.Buffer + buffer.WriteByte(tp) + for l >= 255 { + buffer.WriteByte(255) + l -= 255 + } + buffer.WriteByte(byte(l)) + buffer.Write(data) + buffer.WriteByte(0x80) + if len(t.data) == cap(t.data) { + <-t.data + } + t.data <- buffer +} + +func NewTransform() m7s.ITransformer { + ret := &Transformer{ + data: make(chan util.Buffer, 10), + allocator: util.NewScalableMemoryAllocator(1 << util.MinPowerOf2), + } + return ret +} + +func (t *Transformer) Start() (err error) { + return t.TransformJob.Subscribe() +} + +func (t *Transformer) Run() (err error) { + err = t.TransformJob.Publish(t.TransformJob.Config.Output[0].StreamPath) + if err != nil { + return + } + m7s.PlayBlock(t.TransformJob.Subscriber, func(audio *pkg.RawAudio) (err error) { + copyAudio := &pkg.RawAudio{ + FourCC: audio.FourCC, + Timestamp: audio.Timestamp, + } + copyAudio.SetAllocator(t.allocator) + audio.Memory.Range(func(b []byte) { + copy(copyAudio.NextN(len(b)), b) + }) + return t.TransformJob.Publisher.WriteAudio(copyAudio) + }, func(video *pkg.H26xFrame) (err error) { + copyVideo := &pkg.H26xFrame{ + FourCC: video.FourCC, + CTS: video.CTS, + Timestamp: video.Timestamp, + } + copyVideo.SetAllocator(t.allocator) + if len(t.data) > 0 { + for seiFrame := range t.data { + switch video.FourCC { + case codec.FourCC_H264: + var seiNalu util.Memory + seiNalu.Append([]byte{byte(codec.NALU_SEI)}, seiFrame) + copyVideo.Nalus = append(copyVideo.Nalus, seiNalu) + } + for _, nalu := range video.Nalus { + mem := copyVideo.NextN(nalu.Size) + copy(mem, nalu.ToBytes()) + copyVideo.Nalus.Append(mem) + } + } + } + return t.TransformJob.Publisher.WriteVideo(copyVideo) + }) + return +} + +func (t *Transformer) Dispose() { + close(t.data) + t.allocator.Recycle() +} diff --git a/server.go b/server.go index bcb8f4e..a01d462 100644 --- a/server.go +++ b/server.go @@ -246,12 +246,12 @@ func (s *Server) Start() (err error) { return } } - s.AddTaskLazy(&s.Records) - s.AddTaskLazy(&s.Streams) - s.AddTaskLazy(&s.Pulls) - s.AddTaskLazy(&s.Pushs) - s.AddTaskLazy(&s.Transforms) - s.AddTaskLazy(&s.Devices) + s.AddTask(&s.Records) + s.AddTask(&s.Streams) + s.AddTask(&s.Pulls) + s.AddTask(&s.Pushs) + s.AddTask(&s.Transforms) + s.AddTask(&s.Devices) promReg := prometheus.NewPedanticRegistry() promReg.MustRegister(s) for _, plugin := range plugins { diff --git a/transformer.go b/transformer.go index 8654708..109ef9b 100644 --- a/transformer.go +++ b/transformer.go @@ -20,7 +20,7 @@ type ( Plugin *Plugin Publisher *Publisher Subscriber *Subscriber - transformer ITransformer + Transformer ITransformer } DefaultTransformer struct { task.Job @@ -49,12 +49,12 @@ func (p *TransformJob) GetKey() string { } func (p *TransformJob) Subscribe() (err error) { - p.Subscriber, err = p.Plugin.Subscribe(p.transformer, p.StreamPath) + p.Subscriber, err = p.Plugin.Subscribe(p.Transformer, p.StreamPath) return } func (p *TransformJob) Publish(streamPath string) (err error) { - p.Publisher, err = p.Plugin.Publish(p.transformer, streamPath) + p.Publisher, err = p.Plugin.Publish(p.Transformer, streamPath) return } @@ -62,7 +62,7 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath p.Plugin = plugin p.Config = conf p.StreamPath = streamPath - p.transformer = transformer + p.Transformer = transformer p.Description = map[string]any{ "streamPath": streamPath, "conf": conf, @@ -89,7 +89,7 @@ func (p *TransformJob) Start() (err error) { }) } } - p.AddTask(p.transformer, p.Logger) + p.AddTask(p.Transformer, p.Logger) return }