From 24fa98bdf3c55374e8a50edcd2201de3b86ed717 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Tue, 13 Aug 2024 20:11:47 +0800 Subject: [PATCH] fix: flv record --- api.go | 16 +-- example/default/config.yaml | 8 +- example/default/recordflv.yaml | 4 +- pkg/av-reader.go | 5 +- pkg/error.go | 5 +- pkg/log.go | 3 +- pkg/ring-writer.go | 6 +- pkg/track.go | 2 +- pkg/{ => util}/task-channel.go | 2 +- pkg/{ => util}/task-macro.go | 5 +- pkg/{ => util}/task-retry.go | 5 +- pkg/{ => util}/task.go | 10 +- pkg/{ => util}/task_test.go | 5 +- plugin.go | 80 +++++++----- plugin/flv/pkg/flv.go | 53 +++++--- plugin/flv/pkg/pull.go | 25 ++-- plugin/flv/pkg/record.go | 199 ++++++++++++++++-------------- plugin/gb28181/device.go | 3 +- plugin/preview/index.go | 6 +- plugin/rtmp/pkg/net-connection.go | 4 +- plugin/rtmp/pkg/net-stream.go | 4 +- plugin/rtsp/pkg/connection.go | 3 +- plugin/vmlog/handler.go | 11 -- plugin/vmlog/index.go | 14 ++- plugin/webrtc/index.go | 3 +- plugin/webrtc/pkg/connection.go | 4 +- publisher.go | 29 +++-- puller.go | 6 +- pusher.go | 4 +- recoder.go | 11 +- server.go | 24 ++-- subscriber.go | 32 +++-- 32 files changed, 339 insertions(+), 252 deletions(-) rename pkg/{ => util}/task-channel.go (95%) rename pkg/{ => util}/task-macro.go (97%) rename pkg/{ => util}/task-retry.go (92%) rename pkg/{ => util}/task.go (93%) rename pkg/{ => util}/task_test.go (98%) diff --git a/api.go b/api.go index 7bfb78f..2c6f904 100644 --- a/api.go +++ b/api.go @@ -131,7 +131,7 @@ func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err } func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamInfoResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { if pub, ok := s.Streams.Get(req.StreamPath); ok { res, err = s.getStreamInfo(pub) } else { @@ -142,7 +142,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res return } func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest) (res *pb.SubscribersResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { var subscribers []*pb.SubscriberSnapShot for subscriber := range s.Subscribers.Range { meta, _ := json.Marshal(subscriber.Description) @@ -178,7 +178,7 @@ func (s *Server) GetSubscribers(ctx context.Context, req *pb.SubscribersRequest) return } func (s *Server) AudioTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() { res = &pb.TrackSnapShotResponse{} for _, memlist := range pub.AudioTrack.Allocator.GetChildren() { @@ -257,7 +257,7 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) { } func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() { res = &pb.TrackSnapShotResponse{} for _, memlist := range pub.VideoTrack.Allocator.GetChildren() { @@ -324,7 +324,7 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt } func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { if subscriber, ok := s.Subscribers.Get(req.Id); ok { if pub, ok := s.Streams.Get(req.StreamPath); ok { subscriber.Publisher.RemoveSubscriber(subscriber) @@ -340,7 +340,7 @@ func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeReq } func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { if subscriber, ok := s.Subscribers.Get(req.Id); ok { subscriber.Stop(errors.New("stop by api")) } else { @@ -353,7 +353,7 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res // /api/stream/list func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { var streams []*pb.StreamInfoResponse for publisher := range s.Streams.Range { info, err := s.getStreamInfo(publisher) @@ -369,7 +369,7 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res * } func (s *Server) WaitList(context.Context, *emptypb.Empty) (res *pb.StreamWaitListResponse, err error) { - s.streamTask.Call(func(*pkg.Task) error { + s.streamTask.Call(func(*util.Task) error { res = &pb.StreamWaitListResponse{ List: make(map[string]int32), } diff --git a/example/default/config.yaml b/example/default/config.yaml index 203ba5f..a7d05e5 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -25,10 +25,10 @@ rtmp: pull: pullonsub: # live/pull: rtmp://localhost/live/test -# flv: -# pull: -# pullonstart: -# live/test: /Users/dexter/Movies/jb-demo.flv +flv: + pull: + pullonstart: + live/test: /Users/dexter/project/v5/monibuca/example/default/record/live/test.flv gb28181: sip: listenaddr: diff --git a/example/default/recordflv.yaml b/example/default/recordflv.yaml index a9eb186..a6cb9fa 100644 --- a/example/default/recordflv.yaml +++ b/example/default/recordflv.yaml @@ -3,6 +3,6 @@ global: flv: record: enableregexp: true - fragment: 10s +# fragment: 10s recordlist: - .+: record/$0.flv \ No newline at end of file + .+: record/$0 \ No newline at end of file diff --git a/pkg/av-reader.go b/pkg/av-reader.go index e71d118..ffca69e 100644 --- a/pkg/av-reader.go +++ b/pkg/av-reader.go @@ -5,6 +5,7 @@ import ( "log/slog" "m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/config" + "m7s.live/m7s/v5/pkg/util" "time" ) @@ -22,6 +23,7 @@ const ( ) type AVRingReader struct { + *slog.Logger RingReader Track *AVTrack State byte @@ -34,7 +36,6 @@ type AVRingReader struct { startTime time.Time AbsTime uint32 Delay uint32 - *slog.Logger } func (r *AVRingReader) DecConfChanged() bool { @@ -160,7 +161,7 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) { r.AbsTime = 1 } r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence) - r.Log(context.TODO(), TraceLevel, r.Track.FourCC().String(), "delay", r.Delay) + r.Log(context.TODO(), util.TraceLevel, r.Track.FourCC().String(), "delay", r.Delay) return } diff --git a/pkg/error.go b/pkg/error.go index debf236..958891d 100644 --- a/pkg/error.go +++ b/pkg/error.go @@ -11,6 +11,7 @@ var ( ErrPublishTimeout = errors.New("publish timeout") ErrPublishIdleTimeout = errors.New("publish idle timeout") ErrPublishDelayCloseTimeout = errors.New("publish delay close timeout") + ErrPublishWaitCloseTimeout = errors.New("publish wait close timeout") ErrPushRemoteURLExist = errors.New("push remote url exist") ErrSubscribeTimeout = errors.New("subscribe timeout") ErrRestart = errors.New("restart") @@ -18,6 +19,6 @@ var ( ErrUnsupportCodec = errors.New("unsupport codec") ErrMuted = errors.New("muted") ErrLost = errors.New("lost") - ErrRetryRunOut = errors.New("retry run out") - ErrRecordSamePath = errors.New("record same path") + + ErrRecordSamePath = errors.New("record same path") ) diff --git a/pkg/log.go b/pkg/log.go index c23092c..c0524f5 100644 --- a/pkg/log.go +++ b/pkg/log.go @@ -3,6 +3,7 @@ package pkg import ( "context" "log/slog" + "m7s.live/m7s/v5/pkg/util" "slices" ) @@ -11,7 +12,7 @@ var _ slog.Handler = (*MultiLogHandler)(nil) func ParseLevel(level string) slog.Level { var lv slog.LevelVar if level == "trace" { - lv.Set(TraceLevel) + lv.Set(util.TraceLevel) } else { lv.UnmarshalText([]byte(level)) } diff --git a/pkg/ring-writer.go b/pkg/ring-writer.go index e4ec8a2..9624227 100644 --- a/pkg/ring-writer.go +++ b/pkg/ring-writer.go @@ -139,7 +139,7 @@ func (rb *RingWriter) Step() (normal bool) { isIDR := rb.Value.IDR next := rb.Next() if isIDR { - rb.SLogger.Log(nil, TraceLevel, "add idr") + rb.SLogger.Log(nil, util.TraceLevel, "add idr") rb.PushIDR() } if rb.IDRingList.Len() > 0 { @@ -153,12 +153,12 @@ func (rb *RingWriter) Step() (normal bool) { } } else if next == oldIDR.Value { if nextOld := oldIDR.Next(); nextOld != nil && rb.durationFrom(nextOld.Value) > rb.BufferRange[0] { - rb.SLogger.Log(nil, TraceLevel, "remove old idr") + rb.SLogger.Log(nil, util.TraceLevel, "remove old idr") rb.Lock() rb.IDRingList.Remove(oldIDR) rb.Unlock() } else { - rb.SLogger.Log(nil, TraceLevel, "not enough buffer") + rb.SLogger.Log(nil, util.TraceLevel, "not enough buffer") rb.glow(5) next = rb.Next() } diff --git a/pkg/track.go b/pkg/track.go index 3ac7a32..49179cb 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -116,5 +116,5 @@ func (t *Track) WaitReady() error { } func (t *Track) Trace(msg string, fields ...any) { - t.Log(context.TODO(), TraceLevel, msg, fields...) + t.Log(context.TODO(), util.TraceLevel, msg, fields...) } diff --git a/pkg/task-channel.go b/pkg/util/task-channel.go similarity index 95% rename from pkg/task-channel.go rename to pkg/util/task-channel.go index 590709d..0dd2d88 100644 --- a/pkg/task-channel.go +++ b/pkg/util/task-channel.go @@ -1,4 +1,4 @@ -package pkg +package util import "reflect" diff --git a/pkg/task-macro.go b/pkg/util/task-macro.go similarity index 97% rename from pkg/task-macro.go rename to pkg/util/task-macro.go index 271b0fe..5dcd792 100644 --- a/pkg/task-macro.go +++ b/pkg/util/task-macro.go @@ -1,9 +1,8 @@ -package pkg +package util import ( "context" "log/slog" - "m7s.live/m7s/v5/pkg/util" "os" "reflect" "slices" @@ -86,7 +85,7 @@ func (mt *MarcoTask) lazyStart(t ITask) { task.disposeHandler = EmptyDispose } mt.lazyRun.Do(func() { - mt.shutdown = util.NewPromise(context.Background()) + mt.shutdown = NewPromise(context.Background()) go mt.run() }) mt.addSub <- t diff --git a/pkg/task-retry.go b/pkg/util/task-retry.go similarity index 92% rename from pkg/task-retry.go rename to pkg/util/task-retry.go index ae29bb9..6c5e8f2 100644 --- a/pkg/task-retry.go +++ b/pkg/util/task-retry.go @@ -1,11 +1,14 @@ -package pkg +package util import ( + "errors" "fmt" "reflect" "time" ) +var ErrRetryRunOut = errors.New("retry run out") + type RetryTask struct { Task MaxRetry int diff --git a/pkg/task.go b/pkg/util/task.go similarity index 93% rename from pkg/task.go rename to pkg/util/task.go index 3956954..749dd3e 100644 --- a/pkg/task.go +++ b/pkg/util/task.go @@ -1,4 +1,4 @@ -package pkg +package util import ( "context" @@ -6,8 +6,6 @@ import ( "log/slog" "reflect" "time" - - "m7s.live/m7s/v5/pkg/util" ) const TraceLevel = slog.Level(-8) @@ -50,7 +48,7 @@ type ( afterStartListeners, afterDisposeListeners []func() disposeHandler func() Description map[string]any - startup, shutdown *util.Promise + startup, shutdown *Promise parent *MarcoTask parentCtx context.Context } @@ -137,6 +135,6 @@ func (task *Task) dispose() { func (task *Task) init(ctx context.Context) { task.parentCtx = ctx task.Context, task.CancelCauseFunc = context.WithCancelCause(ctx) - task.startup = util.NewPromise(task.Context) - task.shutdown = util.NewPromise(context.Background()) + task.startup = NewPromise(task.Context) + task.shutdown = NewPromise(context.Background()) } diff --git a/pkg/task_test.go b/pkg/util/task_test.go similarity index 98% rename from pkg/task_test.go rename to pkg/util/task_test.go index de88568..8c1ef9e 100644 --- a/pkg/task_test.go +++ b/pkg/util/task_test.go @@ -1,8 +1,9 @@ -package pkg +package util import ( "context" "log/slog" + "m7s.live/m7s/v5/pkg" "os" "testing" "time" @@ -33,7 +34,7 @@ type retryDemoTask struct { } func (task *retryDemoTask) Start() error { - return ErrRestart + return pkg.ErrRestart } func Test_RetryTask(t *testing.T) { diff --git a/plugin.go b/plugin.go index 344fed0..9947609 100644 --- a/plugin.go +++ b/plugin.go @@ -116,8 +116,9 @@ type iPlugin interface { } type IPlugin interface { - ITask + util.ITask OnInit() error + OnStop() } type IRegisterHandler interface { @@ -182,7 +183,7 @@ func InstallPlugin[C iPlugin](options ...any) error { } type Plugin struct { - MarcoLongTask + util.MarcoLongTask Disabled bool Meta *PluginMeta config config.Common @@ -250,16 +251,10 @@ func (p *Plugin) assign() { func (p *Plugin) Start() (err error) { s := p.Server - err = p.handler.OnInit() - if err != nil { - p.Error("init", "error", err) - return - } if p.Meta.ServiceDesc != nil && s.grpcServer != nil { s.grpcServer.RegisterService(p.Meta.ServiceDesc, p.handler) if p.Meta.RegisterGRPCHandler != nil { if err = p.Meta.RegisterGRPCHandler(p.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn); err != nil { - p.Error("init", "error", err) return } else { p.Info("grpc handler registered") @@ -267,17 +262,25 @@ func (p *Plugin) Start() (err error) { } } s.Plugins.Add(p) - p.listen() + err = p.listen() + if err != nil { + return + } + err = p.handler.OnInit() + if err != nil { + return + } return } func (p *Plugin) Dispose() { - p.Server.Plugins.Remove(p) + p.handler.OnStop() p.config.HTTP.StopListen() p.config.TCP.StopListen() + p.Server.Plugins.Remove(p) } -func (p *Plugin) listen() { +func (p *Plugin) listen() (err error) { httpConf := &p.config.HTTP if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.Server.config.HTTP.ListenAddrTLS) { p.Info("https listen at ", "addr", httpConf.ListenAddrTLS) @@ -292,44 +295,49 @@ func (p *Plugin) listen() { }() } + defer func() { + if err != nil { + p.config.HTTP.StopListen() + } + }() + if tcphandler, ok := p.handler.(ITCPPlugin); ok { tcpConf := &p.config.TCP if tcpConf.ListenAddr != "" && tcpConf.AutoListen { p.Info("listen tcp", "addr", tcpConf.ListenAddr) - go func() { - err := tcpConf.Listen(tcphandler.OnTCPConnect) - if err != nil { - p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err) - p.Stop(err) - } - }() + err = tcpConf.Listen(tcphandler.OnTCPConnect) + if err != nil { + p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err) + return + } } if tcpConf.ListenAddrTLS != "" && tcpConf.AutoListen { p.Info("listen tcp tls", "addr", tcpConf.ListenAddrTLS) - go func() { - err := tcpConf.ListenTLS(tcphandler.OnTCPConnect) - if err != nil { - p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err) - p.Stop(err) - } - }() + err = tcpConf.ListenTLS(tcphandler.OnTCPConnect) + if err != nil { + p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err) + return + } } + defer func() { + if err != nil { + p.config.TCP.StopListen() + } + }() } if udpHandler, ok := p.handler.(IUDPPlugin); ok { udpConf := &p.config.UDP if udpConf.ListenAddr != "" && udpConf.AutoListen { p.Info("listen udp", "addr", udpConf.ListenAddr) - go func() { - err := udpConf.Listen(udpHandler.OnUDPConnect) - if err != nil { - p.Error("listen udp", "addr", udpConf.ListenAddr, "error", err) - p.Stop(err) - } - }() - + err = udpConf.Listen(udpHandler.OnUDPConnect) + if err != nil { + p.Error("listen udp", "addr", udpConf.ListenAddr, "error", err) + return + } } } + return } func (p *Plugin) OnInit() error { @@ -340,6 +348,10 @@ func (p *Plugin) OnExit() { } +func (p *Plugin) OnStop() { + +} + func (p *Plugin) PublishWithConfig(ctx context.Context, streamPath string, conf config.Publish) (publisher *Publisher, err error) { publisher = createPublisher(p, streamPath, conf) if p.config.EnableAuth { @@ -458,7 +470,7 @@ func (p *Plugin) AddLogHandler(handler slog.Handler) { } func (p *Plugin) SaveConfig() (err error) { - p.Server.Call(func(*Task) (err error) { + p.Server.Call(func(*util.Task) (err error) { if p.Modify == nil { os.Remove(p.settingPath()) return diff --git a/plugin/flv/pkg/flv.go b/plugin/flv/pkg/flv.go index 5a8974f..c2e2a70 100644 --- a/plugin/flv/pkg/flv.go +++ b/plugin/flv/pkg/flv.go @@ -2,10 +2,10 @@ package flv import ( "bufio" + "encoding/binary" "io" "m7s.live/m7s/v5/pkg/util" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" - "net" ) const ( @@ -17,28 +17,51 @@ const ( var FLVHead = []byte{'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0} -func AVCC2FLV(t byte, ts uint32, avcc ...[]byte) (flv net.Buffers) { - b := util.Buffer(make([]byte, 0, 15)) - b.WriteByte(t) - dataSize := util.SizeOfBuffers(avcc) - b.WriteUint24(uint32(dataSize)) - b.WriteUint24(ts) - b.WriteByte(byte(ts >> 24)) - b.WriteUint24(0) - return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11)) +type FlvWriter struct { + io.Writer + buf [15]byte } +func NewFlvWriter(w io.Writer) *FlvWriter { + return &FlvWriter{Writer: w} +} + +func (w *FlvWriter) WriteTag(t byte, ts, dataSize uint32, payload ...[]byte) (err error) { + WriteFLVTagHead(t, ts, dataSize, w.buf[:]) + if _, err = w.Write(w.buf[:11]); err != nil { + return + } + for _, p := range payload { + if _, err = w.Write(p); err != nil { + return + } + } + binary.BigEndian.PutUint32(w.buf[11:], dataSize+11) + _, err = w.Write(w.buf[11:]) + return +} + +//func AVCC2FLV(t byte, ts uint32, avcc ...[]byte) (flv net.Buffers) { +// b := util.Buffer(make([]byte, 0, 15)) +// b.WriteByte(t) +// dataSize := util.SizeOfBuffers(avcc) +// b.WriteUint24(uint32(dataSize)) +// b.WriteUint24(ts) +// b.WriteByte(byte(ts >> 24)) +// b.WriteUint24(0) +// return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11)) +//} + func WriteFLVTagHead(t uint8, ts, dataSize uint32, b []byte) { b[0] = t b[1], b[2], b[3] = byte(dataSize>>16), byte(dataSize>>8), byte(dataSize) b[4], b[5], b[6], b[7] = byte(ts>>16), byte(ts>>8), byte(ts), byte(ts>>24) } -func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload ...[]byte) (err error) { - buffers := AVCC2FLV(t, timestamp, payload...) - _, err = buffers.WriteTo(w) - return -} +//func WriteFLVTag(w io.Writer, t byte, timestamp uint32, payload ...[]byte) (n int64, err error) { +// buffers := AVCC2FLV(t, timestamp, payload...) +// return buffers.WriteTo(w) +//} func ReadMetaData(reader io.Reader) (metaData rtmp.EcmaArray, err error) { r := bufio.NewReader(reader) diff --git a/plugin/flv/pkg/pull.go b/plugin/flv/pkg/pull.go index b9f26d6..d78de25 100644 --- a/plugin/flv/pkg/pull.go +++ b/plugin/flv/pkg/pull.go @@ -93,13 +93,9 @@ func PullFLV(p *m7s.PullContext) (err error) { return err } var frame rtmp.RTMPData - switch ds := int(dataSize); t { - case FLV_TAG_TYPE_AUDIO, FLV_TAG_TYPE_VIDEO: - frame.SetAllocator(allocator) - err = reader.ReadNto(ds, frame.NextN(ds)) - default: - err = reader.Skip(ds) - } + ds := int(dataSize) + frame.SetAllocator(allocator) + err = reader.ReadNto(ds, frame.NextN(ds)) if err != nil { return err } @@ -112,7 +108,20 @@ func PullFLV(p *m7s.PullContext) (err error) { case FLV_TAG_TYPE_VIDEO: err = p.Publisher.WriteVideo(frame.WrapVideo()) case FLV_TAG_TYPE_SCRIPT: - p.Info("script") + r := frame.NewReader() + amf := &rtmp.AMF{ + Buffer: util.Buffer(r.ToBytes()), + } + var obj any + obj, err = amf.Unmarshal() + name := obj + obj, err = amf.Unmarshal() + metaData := obj + frame.Recycle() + if err != nil { + return err + } + p.Info("script", name, metaData) } } return diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index bf02626..e50f9a4 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -1,25 +1,29 @@ package flv import ( + "fmt" "io" "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/util" rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" "os" + "path/filepath" "slices" "time" ) -var writeMetaTagQueueTask pkg.MarcoLongTask +var writeMetaTagQueueTask util.MarcoLongTask func init() { - pkg.RootTask.AddTask(&writeMetaTagQueueTask) + writeMetaTagQueueTask.Name = "writeMetaTagQueue" + util.RootTask.AddTask(&writeMetaTagQueueTask) } type writeMetaTagTask struct { - pkg.Task + util.Task file *os.File + writer *FlvWriter flags byte metaData []byte } @@ -46,7 +50,8 @@ func (task *writeMetaTagTask) Start() (err error) { task.Error(err.Error()) return } - err = WriteFLVTag(tempFile, FLV_TAG_TYPE_SCRIPT, 0, task.metaData) + task.writer = NewFlvWriter(tempFile) + err = task.writer.WriteTag(FLV_TAG_TYPE_SCRIPT, 0, uint32(len(task.metaData)), task.metaData) _, err = task.file.Seek(13, io.SeekStart) if err != nil { task.Error("writeMetaData Seek failed", "err", err) @@ -67,71 +72,78 @@ func (task *writeMetaTagTask) Start() (err error) { } } +func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64, times []float64, duration *int64) { + ar, vr := suber.AudioReader, suber.VideoReader + hasAudio, hasVideo := ar != nil, vr != nil + var amf rtmp.AMF + metaData := rtmp.EcmaArray{ + "MetaDataCreator": "m7s/" + m7s.Version, + "hasVideo": hasVideo, + "hasAudio": hasAudio, + "hasMatadata": true, + "canSeekToEnd": true, + "duration": float64(*duration) / 1000, + "hasKeyFrames": len(filepositions) > 0, + "filesize": 0, + } + var flags byte + if hasAudio { + ctx := ar.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx) + flags |= (1 << 2) + metaData["audiocodecid"] = int(rtmp.ParseAudioCodec(ctx.FourCC())) + metaData["audiosamplerate"] = ctx.GetSampleRate() + metaData["audiosamplesize"] = ctx.GetSampleSize() + metaData["stereo"] = ctx.GetChannels() == 2 + } + if hasVideo { + ctx := vr.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx) + flags |= 1 + metaData["videocodecid"] = int(rtmp.ParseVideoCodec(ctx.FourCC())) + metaData["width"] = ctx.Width() + metaData["height"] = ctx.Height() + metaData["framerate"] = vr.Track.FPS + metaData["videodatarate"] = vr.Track.BPS + metaData["keyframes"] = map[string]any{ + "filepositions": filepositions, + "times": times, + } + } + amf.Marshals("onMetaData", metaData) + offset := amf.Len() + 13 + 15 + if keyframesCount := len(filepositions); keyframesCount > 0 { + metaData["filesize"] = uint64(offset) + filepositions[keyframesCount-1] + for i := range filepositions { + filepositions[i] += uint64(offset) + } + metaData["keyframes"] = map[string]any{ + "filepositions": filepositions, + "times": times, + } + } + amf.Reset() + marshals := amf.Marshals("onMetaData", metaData) + task := &writeMetaTagTask{ + file: file, + flags: flags, + metaData: marshals, + } + task.Logger = suber.Logger.With("file", file.Name()) + writeMetaTagQueueTask.AddTask(task) +} + func RecordFlv(ctx *m7s.RecordContext) (err error) { var file *os.File var filepositions []uint64 var times []float64 var offset int64 var duration int64 - if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR|util.Conditoinal(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil { - return - } suber := ctx.Subscriber - ar, vr := suber.AudioReader, suber.VideoReader - hasAudio, hasVideo := ar != nil, vr != nil - writeMetaTag := func(file *os.File, filepositions []uint64, times []float64) { - var amf rtmp.AMF - metaData := rtmp.EcmaArray{ - "MetaDataCreator": "m7s/" + m7s.Version, - "hasVideo": hasVideo, - "hasAudio": hasAudio, - "hasMatadata": true, - "canSeekToEnd": true, - "duration": float64(duration) / 1000, - "hasKeyFrames": len(filepositions) > 0, - "filesize": 0, + noFragment := ctx.Fragment == 0 || ctx.Append + if noFragment { + if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR|util.Conditoinal(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil { + return } - var flags byte - if hasAudio { - ctx := ar.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx) - flags |= (1 << 2) - metaData["audiocodecid"] = int(rtmp.ParseAudioCodec(ctx.FourCC())) - metaData["audiosamplerate"] = ctx.GetSampleRate() - metaData["audiosamplesize"] = ctx.GetSampleSize() - metaData["stereo"] = ctx.GetChannels() == 2 - } - if hasVideo { - ctx := vr.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx) - flags |= 1 - metaData["videocodecid"] = int(rtmp.ParseVideoCodec(ctx.FourCC())) - metaData["width"] = ctx.Width() - metaData["height"] = ctx.Height() - metaData["framerate"] = vr.Track.FPS - metaData["videodatarate"] = vr.Track.BPS - metaData["keyframes"] = map[string]any{ - "filepositions": filepositions, - "times": times, - } - } - amf.Marshals("onMetaData", metaData) - offset := amf.Len() + 13 + 15 - if keyframesCount := len(filepositions); keyframesCount > 0 { - metaData["filesize"] = uint64(offset) + filepositions[keyframesCount-1] - for i := range filepositions { - filepositions[i] += uint64(offset) - } - metaData["keyframes"] = map[string]any{ - "filepositions": filepositions, - "times": times, - } - } - amf.Reset() - marshals := amf.Marshals("onMetaData", metaData) - writeMetaTagQueueTask.AddTask(&writeMetaTagTask{ - file: file, - flags: flags, - metaData: marshals, - }) + defer writeMetaTag(file, suber, filepositions, times, &duration) } if ctx.Append { var metaData rtmp.EcmaArray @@ -145,60 +157,67 @@ func RecordFlv(ctx *m7s.RecordContext) (err error) { times = keyframes["times"].([]float64) if _, err = file.Seek(-4, io.SeekEnd); err != nil { ctx.Error("seek file failed", "err", err) - file.Write(FLVHead) + _, err = file.Write(FLVHead) } else { tmp := make(util.Buffer, 4) tmp2 := tmp - file.Read(tmp) + _, err = file.Read(tmp) tagSize := tmp.ReadUint32() tmp = tmp2 - file.Seek(int64(tagSize), io.SeekEnd) - file.Read(tmp2) + _, err = file.Seek(int64(tagSize), io.SeekEnd) + _, err = file.Read(tmp2) ts := tmp2.ReadUint24() | (uint32(tmp[3]) << 24) ctx.Info("append flv", "last tagSize", tagSize, "last ts", ts) - if hasVideo { - vr.StartTs = time.Duration(ts) * time.Millisecond - } - if hasAudio { - ar.StartTs = time.Duration(ts) * time.Millisecond - } - file.Seek(0, io.SeekEnd) + suber.StartAudioTS = time.Duration(ts) * time.Millisecond + suber.StartVideoTS = time.Duration(ts) * time.Millisecond + offset, err = file.Seek(0, io.SeekEnd) } - } else { + } else if ctx.Fragment == 0 { file.Write(FLVHead) - } - if ctx.Fragment == 0 { - defer writeMetaTag(file, filepositions, times) - } - checkFragment := func(absTime uint32) { - if ctx.Fragment == 0 { + } else { + if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil { return } + _, err = file.Write(FLVHead) + } + writer := NewFlvWriter(file) + checkFragment := func(absTime uint32) { if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.Fragment { - writeMetaTag(file, filepositions, times) + writeMetaTag(file, suber, filepositions, times, &duration) filepositions = []uint64{0} times = []float64{0} offset = 0 - if file, err = os.OpenFile(ctx.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil { + if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil { return } - file.Write(FLVHead) - if vr != nil { + _, err = file.Write(FLVHead) + writer = NewFlvWriter(file) + if vr := suber.VideoReader; vr != nil { vr.ResetAbsTime() - err = WriteFLVTag(file, FLV_TAG_TYPE_VIDEO, 0, vr.Track.SequenceFrame.(*rtmp.RTMPVideo).Buffers...) + seq := vr.Track.SequenceFrame.(*rtmp.RTMPVideo) + err = writer.WriteTag(FLV_TAG_TYPE_VIDEO, 0, uint32(seq.Size), seq.Buffers...) + offset = int64(seq.Size + 15) } } } + return m7s.PlayBlock(ctx.Subscriber, func(audio *rtmp.RTMPAudio) (err error) { - if !hasVideo { - checkFragment(ar.AbsTime) + if suber.VideoReader == nil && !noFragment { + checkFragment(suber.AudioReader.AbsTime) } - return WriteFLVTag(file, FLV_TAG_TYPE_AUDIO, vr.AbsTime, audio.Buffers...) + err = writer.WriteTag(FLV_TAG_TYPE_AUDIO, suber.AudioReader.AbsTime, uint32(audio.Size), audio.Buffers...) + offset += int64(audio.Size + 15) + return }, func(video *rtmp.RTMPVideo) (err error) { - if vr.Value.IDR { + if suber.VideoReader.Value.IDR { filepositions = append(filepositions, uint64(offset)) - times = append(times, float64(vr.AbsTime)/1000) + times = append(times, float64(suber.VideoReader.AbsTime)/1000) + if !noFragment { + checkFragment(suber.VideoReader.AbsTime) + } } - return WriteFLVTag(file, FLV_TAG_TYPE_VIDEO, vr.AbsTime, video.Buffers...) + err = writer.WriteTag(FLV_TAG_TYPE_VIDEO, suber.VideoReader.AbsTime, uint32(video.Size), video.Buffers...) + offset += int64(video.Size + 15) + return }) } diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 7114c67..45101e7 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -5,7 +5,6 @@ import ( "github.com/emiago/sipgo/sip" "log/slog" "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/util" gb28181 "m7s.live/m7s/v5/plugin/gb28181/pkg" "net/http" @@ -24,7 +23,7 @@ const ( ) type Device struct { - pkg.Task + util.Task ID string Name string Manufacturer string diff --git a/plugin/preview/index.go b/plugin/preview/index.go index b94b675..c2141fe 100644 --- a/plugin/preview/index.go +++ b/plugin/preview/index.go @@ -3,13 +3,13 @@ package plugin_preview import ( "embed" "fmt" + "m7s.live/m7s/v5/pkg/util" "mime" "net/http" "path/filepath" "strings" "m7s.live/m7s/v5" - "m7s.live/m7s/v5/pkg" ) //go:embed ui @@ -24,14 +24,14 @@ var _ = m7s.InstallPlugin[PreviewPlugin]() func (p *PreviewPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" { s := "