feat: add sei plugin

This commit is contained in:
langhuihui
2024-09-25 17:56:59 +08:00
parent b91f814c5f
commit c08514d20e
11 changed files with 244 additions and 38 deletions

34
Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# Compile Stage
FROM golang:1.23.1-bullseye AS builder
LABEL stage=gobuilder
# Env
ENV CGO_ENABLE 0
ENV GOOS linux
ENV GOARCH amd64
#ENV GOPROXY https://goproxy.cn,direct
ENV HOME /monibuca
WORKDIR /
RUN git clone -b v5 --depth 1 https://github.com/langhuihui/monibuca
# compile
WORKDIR /monibuca
RUN go build -tags sqlite -o ./build/monibuca ./example/default/main.go
RUN cp -r /monibuca/example/default/config.yaml /monibuca/build
# Running Stage
FROM alpine:latest
WORKDIR /monibuca
COPY --from=builder /monibuca/build /monibuca/
RUN cp -r ./config.yaml /etc/monibuca
# Export necessary ports
EXPOSE 8080 8443 1935 554 5060 9000-20000
EXPOSE 5060/udp
CMD [ "./monibuca", "-c", "/etc/monibuca/config.yaml" ]

View File

@@ -17,6 +17,7 @@ import (
_ "m7s.live/m7s/v5/plugin/stress"
_ "m7s.live/m7s/v5/plugin/transcode"
_ "m7s.live/m7s/v5/plugin/webrtc"
_ "m7s.live/m7s/v5/plugin/sei"
)
func main() {

View File

@@ -1,9 +1,10 @@
package config
import (
"m7s.live/m7s/v5/pkg/util"
"net/url"
"time"
"m7s.live/m7s/v5/pkg/util"
)
type (
@@ -56,13 +57,14 @@ type (
Fragment time.Duration `desc:"分片时长"` // 分片时长
Append bool `desc:"是否追加录制"` // 是否追加录制
}
TransfromOutput struct {
Target string `desc:"转码目标"` // 转码目标
StreamPath string
Conf any
}
Transform struct {
Input any
Output []struct {
Target string `desc:"转码目标"` // 转码目标
StreamPath string
Conf any
}
Output []TransfromOutput
}
OnPublish struct {
Push map[Regexp]Push

View File

@@ -57,6 +57,7 @@ func (r *RawAudio) Demux(ctx codec.ICodecCtx) (any, error) {
func (r *RawAudio) Mux(ctx codec.ICodecCtx, frame *AVFrame) {
r.InitRecycleIndexes(0)
r.FourCC = ctx.FourCC()
r.Memory = frame.Raw.(util.Memory)
r.Timestamp = frame.Timestamp
}
@@ -158,6 +159,7 @@ func (h *H26xFrame) Demux(ctx codec.ICodecCtx) (any, error) {
}
func (h *H26xFrame) Mux(ctx codec.ICodecCtx, frame *AVFrame) {
h.FourCC = ctx.FourCC()
h.Nalus = frame.Raw.(Nalus)
h.Timestamp = frame.Timestamp
h.CTS = frame.CTS

View File

@@ -79,23 +79,8 @@ func (mt *Job) RangeSubTask(callback func(task ITask) bool) {
}
}
func (mt *Job) AddTaskLazy(t IJob) {
task := t.GetTask()
task.parent = mt
task.handler = t
}
func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
mt.lazyRun.Do(func() {
if mt.parent != nil && mt.Context == nil {
mt.parent.AddTask(mt.handler) //from lazy
}
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run()
})
if task = t.GetTask(); task.Context == nil {
task.parentCtx = mt.Context
if task = t.GetTask(); t != task.handler { // first add
for _, o := range opt {
switch v := o.(type) {
case context.Context:
@@ -108,10 +93,29 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
task.Logger = v
}
}
if task.parentCtx == nil {
panic("context is nil")
}
task.parent = mt
task.handler = t
switch t.(type) {
case TaskStarter, TaskBlock, TaskGo:
// need start now
default:
// lazy start
return
}
}
mt.lazyRun.Do(func() {
if mt.parent != nil && mt.Context == nil {
mt.parent.AddTask(mt.handler) // second add, lazy start
}
mt.childrenDisposed = make(chan struct{})
mt.addSub = make(chan ITask, 10)
go mt.run()
})
if task.Context == nil {
if task.parentCtx == nil {
task.parentCtx = mt.Context
}
task.level = mt.level + 1
if task.ID == 0 {
task.ID = GetNextTaskID()

View File

@@ -23,7 +23,7 @@ type WriteFlvMetaTagQueueTask struct {
var writeMetaTagQueueTask WriteFlvMetaTagQueueTask
func init() {
m7s.Servers.AddTaskLazy(&writeMetaTagQueueTask)
m7s.Servers.AddTask(&writeMetaTagQueueTask)
}
type writeMetaTagTask struct {

View File

@@ -52,7 +52,7 @@ func (task *writeTrailerTask) Start() (err error) {
}
func init() {
m7s.Servers.AddTaskLazy(&writeTrailerQueueTask)
m7s.Servers.AddTask(&writeTrailerQueueTask)
}
func NewRecorder() m7s.IRecorder {

73
plugin/sei/index.go Normal file
View File

@@ -0,0 +1,73 @@
package plugin_sei
import (
"io"
"net/http"
"strconv"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
sei "m7s.live/m7s/v5/plugin/sei/pkg"
)
var _ = m7s.InstallPlugin[SEIPlugin](sei.NewTransform)
type SEIPlugin struct {
m7s.Plugin
}
func (conf *SEIPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/api/insert/{streamPath...}": conf.api_insert,
}
}
func (conf *SEIPlugin) api_insert(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
streamPath := r.PathValue("streamPath")
targetStreamPath := q.Get("targetStreamPath")
if targetStreamPath == "" {
targetStreamPath = streamPath + "/sei"
}
ok := conf.Server.Streams.Has(streamPath)
if !ok {
util.ReturnError(util.APIErrorNoStream, streamPath+" not found", w, r)
return
}
var transformer *sei.Transformer
if tm, ok := conf.Server.Transforms.Transformed.Get(targetStreamPath); ok {
transformer, ok = tm.TransformJob.Transformer.(*sei.Transformer)
if !ok {
util.ReturnError(util.APIErrorPublish, "targetStreamPath is not a sei transformer", w, r)
return
}
} else {
transformer = sei.NewTransform().(*sei.Transformer)
conf.Transform(streamPath, config.Transform{
Output: []config.TransfromOutput{
{
Target: targetStreamPath,
StreamPath: targetStreamPath,
},
},
})
}
t := q.Get("type")
tb, err := strconv.ParseInt(t, 10, 8)
if err != nil {
if t == "" {
tb = 5
} else {
util.ReturnError(util.APIErrorQueryParse, "type must a number", w, r)
return
}
}
sei, err := io.ReadAll(r.Body)
if err != nil {
util.ReturnError(util.APIErrorNoBody, err.Error(), w, r)
return
}
transformer.AddSEI(byte(tb), sei)
util.ReturnOK(w, r)
}

View File

@@ -0,0 +1,90 @@
package sei
import (
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type Transformer struct {
m7s.DefaultTransformer
data chan util.Buffer
allocator *util.ScalableMemoryAllocator
}
func (t *Transformer) AddSEI(tp byte, data []byte) {
l := len(data)
var buffer util.Buffer
buffer.WriteByte(tp)
for l >= 255 {
buffer.WriteByte(255)
l -= 255
}
buffer.WriteByte(byte(l))
buffer.Write(data)
buffer.WriteByte(0x80)
if len(t.data) == cap(t.data) {
<-t.data
}
t.data <- buffer
}
func NewTransform() m7s.ITransformer {
ret := &Transformer{
data: make(chan util.Buffer, 10),
allocator: util.NewScalableMemoryAllocator(1 << util.MinPowerOf2),
}
return ret
}
func (t *Transformer) Start() (err error) {
return t.TransformJob.Subscribe()
}
func (t *Transformer) Run() (err error) {
err = t.TransformJob.Publish(t.TransformJob.Config.Output[0].StreamPath)
if err != nil {
return
}
m7s.PlayBlock(t.TransformJob.Subscriber, func(audio *pkg.RawAudio) (err error) {
copyAudio := &pkg.RawAudio{
FourCC: audio.FourCC,
Timestamp: audio.Timestamp,
}
copyAudio.SetAllocator(t.allocator)
audio.Memory.Range(func(b []byte) {
copy(copyAudio.NextN(len(b)), b)
})
return t.TransformJob.Publisher.WriteAudio(copyAudio)
}, func(video *pkg.H26xFrame) (err error) {
copyVideo := &pkg.H26xFrame{
FourCC: video.FourCC,
CTS: video.CTS,
Timestamp: video.Timestamp,
}
copyVideo.SetAllocator(t.allocator)
if len(t.data) > 0 {
for seiFrame := range t.data {
switch video.FourCC {
case codec.FourCC_H264:
var seiNalu util.Memory
seiNalu.Append([]byte{byte(codec.NALU_SEI)}, seiFrame)
copyVideo.Nalus = append(copyVideo.Nalus, seiNalu)
}
for _, nalu := range video.Nalus {
mem := copyVideo.NextN(nalu.Size)
copy(mem, nalu.ToBytes())
copyVideo.Nalus.Append(mem)
}
}
}
return t.TransformJob.Publisher.WriteVideo(copyVideo)
})
return
}
func (t *Transformer) Dispose() {
close(t.data)
t.allocator.Recycle()
}

View File

@@ -246,12 +246,12 @@ func (s *Server) Start() (err error) {
return
}
}
s.AddTaskLazy(&s.Records)
s.AddTaskLazy(&s.Streams)
s.AddTaskLazy(&s.Pulls)
s.AddTaskLazy(&s.Pushs)
s.AddTaskLazy(&s.Transforms)
s.AddTaskLazy(&s.Devices)
s.AddTask(&s.Records)
s.AddTask(&s.Streams)
s.AddTask(&s.Pulls)
s.AddTask(&s.Pushs)
s.AddTask(&s.Transforms)
s.AddTask(&s.Devices)
promReg := prometheus.NewPedanticRegistry()
promReg.MustRegister(s)
for _, plugin := range plugins {

View File

@@ -20,7 +20,7 @@ type (
Plugin *Plugin
Publisher *Publisher
Subscriber *Subscriber
transformer ITransformer
Transformer ITransformer
}
DefaultTransformer struct {
task.Job
@@ -49,12 +49,12 @@ func (p *TransformJob) GetKey() string {
}
func (p *TransformJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.Subscribe(p.transformer, p.StreamPath)
p.Subscriber, err = p.Plugin.Subscribe(p.Transformer, p.StreamPath)
return
}
func (p *TransformJob) Publish(streamPath string) (err error) {
p.Publisher, err = p.Plugin.Publish(p.transformer, streamPath)
p.Publisher, err = p.Plugin.Publish(p.Transformer, streamPath)
return
}
@@ -62,7 +62,7 @@ func (p *TransformJob) Init(transformer ITransformer, plugin *Plugin, streamPath
p.Plugin = plugin
p.Config = conf
p.StreamPath = streamPath
p.transformer = transformer
p.Transformer = transformer
p.Description = map[string]any{
"streamPath": streamPath,
"conf": conf,
@@ -89,7 +89,7 @@ func (p *TransformJob) Start() (err error) {
})
}
}
p.AddTask(p.transformer, p.Logger)
p.AddTask(p.Transformer, p.Logger)
return
}