diff --git a/example/default/config.yaml b/example/default/config.yaml index ef79823..ff39549 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,5 +1,6 @@ global: loglevel: debug rtmp: - publish: - # pubvideo: false \ No newline at end of file + subscribe: + # submode: 1 + subaudio: false \ No newline at end of file diff --git a/example/default/main.go b/example/default/main.go index 9f5269c..5df328e 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -5,6 +5,7 @@ import ( "m7s.live/m7s/v5" _ "m7s.live/m7s/v5/plugin/rtmp" + _ "m7s.live/m7s/v5/plugin/hdl" ) func main() { diff --git a/pkg/avframe.go b/pkg/avframe.go index 403489f..82a9bc3 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -95,6 +95,7 @@ type IAVFrame interface { GetTimestamp() time.Duration Recycle() IsIDR() bool + Print() string } type Nalu [][]byte diff --git a/pkg/util/pool.go b/pkg/util/pool.go index afd7e18..394d4c7 100644 --- a/pkg/util/pool.go +++ b/pkg/util/pool.go @@ -1,6 +1,8 @@ package util -import "net" +import ( + "net" +) type Pool[T any] struct { pool []T @@ -43,3 +45,27 @@ func (r *RecyclableMemory) Recycle() { } } } + +type BytesPool struct { + Pool[[]byte] + ItemSize int +} + +func (bp *BytesPool) GetN(size int) []byte { + if size != bp.ItemSize { + return make([]byte, size) + } + ret := bp.Pool.Get() + if ret == nil { + return make([]byte, size) + } + return ret +} + +func (bp *BytesPool) Put(b []byte) { + if cap(b) != bp.ItemSize { + bp.ItemSize = cap(b) + bp.Clear() + } + bp.Pool.Put(b) +} diff --git a/plugin.go b/plugin.go index 7635291..5bfb062 100644 --- a/plugin.go +++ b/plugin.go @@ -3,6 +3,7 @@ package m7s import ( "context" "net" + "net/http" "os" "path/filepath" "reflect" @@ -72,7 +73,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { } p.Info("init", "version", plugin.Version) instance.OnInit() - go p.Start() + p.Start() } type iPlugin interface { @@ -145,8 +146,8 @@ func (p *Plugin) GetCommonConf() *config.Common { return &p.config } -func (opt *Plugin) settingPath() string { - return filepath.Join(opt.server.SettingDir, strings.ToLower(opt.Meta.Name)+".yaml") +func (p *Plugin) settingPath() string { + return filepath.Join(p.server.SettingDir, strings.ToLower(p.Meta.Name)+".yaml") } func (p *Plugin) assign() { @@ -160,7 +161,7 @@ func (p *Plugin) assign() { } p.Config.ParseModifyFile(modifyConfig) } - // p.registerHandler() + p.registerHandler() } func (p *Plugin) Stop(err error) { @@ -172,14 +173,14 @@ func (p *Plugin) Stop(err error) { func (p *Plugin) Start() { httpConf := p.config.HTTP if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.server.config.HTTP.ListenAddrTLS) { + p.Info("https listen at ", "addr", httpConf.ListenAddrTLS) go func() { - p.Info("https listen at ", "addr", httpConf.ListenAddrTLS) p.Stop(httpConf.ListenTLS()) }() } if httpConf.ListenAddr != "" && (httpConf.ListenAddr != p.server.config.HTTP.ListenAddr) { + p.Info("http listen at ", "addr", httpConf.ListenAddr) go func() { - p.Info("http listen at ", "addr", httpConf.ListenAddr) p.Stop(httpConf.Listen()) }() } @@ -191,21 +192,23 @@ func (p *Plugin) Start() { if p.config.TCP.ListenAddr != "" { p.Info("listen tcp", "addr", tcpConf.ListenAddr) - err := tcpConf.Listen(tcphandler.OnTCPConnect) - if err != nil { - p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err) - p.Stop(err) - return - } + go func() { + err := tcpConf.Listen(tcphandler.OnTCPConnect) + if err != nil { + p.Error("listen tcp", "addr", tcpConf.ListenAddr, "error", err) + p.Stop(err) + } + }() } if tcpConf.ListenAddrTLS != "" { p.Info("listen tcp tls", "addr", tcpConf.ListenAddrTLS) - err := tcpConf.ListenTLS(tcphandler.OnTCPConnect) - if err != nil { - p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err) - p.Stop(err) - return - } + go func() { + err := tcpConf.ListenTLS(tcphandler.OnTCPConnect) + if err != nil { + p.Error("listen tcp tls", "addr", tcpConf.ListenAddrTLS, "error", err) + p.Stop(err) + } + }() } } @@ -244,3 +247,49 @@ func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subsc err = sendPromiseToServer(p.server, subscriber) return } + +func (p *Plugin) registerHandler() { + t := reflect.TypeOf(p.handler) + v := reflect.ValueOf(p.handler) + // 注册http响应 + for i, j := 0, t.NumMethod(); i < j; i++ { + name := t.Method(i).Name + if name == "ServeHTTP" { + continue + } + switch handler := v.Method(i).Interface().(type) { + case func(http.ResponseWriter, *http.Request): + patten := strings.ToLower(strings.ReplaceAll(name, "_", "/")) + p.handle(patten, http.HandlerFunc(handler)) + } + } + if rootHandler, ok := p.handler.(http.Handler); ok { + p.handle("/", rootHandler) + } +} + +func (p *Plugin) logHandler(handler http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + p.Debug("visit", "path", r.URL.String(), "remote", r.RemoteAddr) + name := strings.ToLower(p.Meta.Name) + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name) + handler.ServeHTTP(rw, r) + }) +} + +func (p *Plugin) handle(pattern string, handler http.Handler) { + if p == nil { + return + } + if !strings.HasPrefix(pattern, "/") { + pattern = "/" + pattern + } + handler = p.logHandler(handler) + p.GetCommonConf().Handle(pattern, handler) + if p.server != nil { + pattern = "/" + strings.ToLower(p.Meta.Name) + pattern + p.Debug("http handle added to server", "pattern", pattern) + p.server.GetCommonConf().Handle(pattern, handler) + } + // apiList = append(apiList, pattern) +} diff --git a/plugin/demo/index.go b/plugin/demo/index.go index 6feaa01..46f0a3e 100644 --- a/plugin/demo/index.go +++ b/plugin/demo/index.go @@ -6,7 +6,7 @@ import ( "m7s.live/m7s/v5" "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/util" - rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" + "m7s.live/m7s/v5/plugin/rtmp/pkg" ) type AnnexB struct { @@ -48,13 +48,13 @@ type DemoPlugin struct { m7s.Plugin } -func (p *DemoPlugin) OnInit() { - publisher, err := p.Publish("live/demo") - if err == nil { - var annexB AnnexB - publisher.WriteVideo(&annexB) - } -} +// func (p *DemoPlugin) OnInit() { +// publisher, err := p.Publish("live/demo") +// if err == nil { +// var annexB AnnexB +// publisher.WriteVideo(&annexB) +// } +// } func (p *DemoPlugin) OnPublish(publisher *m7s.Publisher) { subscriber, err := p.Subscribe(publisher.StreamPath) diff --git a/plugin/hdl/index.go b/plugin/hdl/index.go new file mode 100644 index 0000000..299367e --- /dev/null +++ b/plugin/hdl/index.go @@ -0,0 +1,116 @@ +package plugin_hdl + +import ( + "io" + "net" + "net/http" + "strings" + "time" + + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/pkg/util" + . "m7s.live/m7s/v5/plugin/hdl/pkg" + rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg" +) + +type HDLPlugin struct { + m7s.Plugin +} + +var _ = m7s.InstallPlugin[HDLPlugin]() + +func (p *HDLPlugin) WriteFlvHeader(sub *m7s.Subscriber, w io.Writer) { + // at, vt := sub.Publisher, sub.Video + // hasAudio, hasVideo := at != nil, vt != nil + // var amf rtmp.AMF + // amf.Marshal("onMetaData") + // metaData := rtmp.EcmaArray{ + // "MetaDataCreator": "m7s" + m7s.Version, + // "hasVideo": hasVideo, + // "hasAudio": hasAudio, + // "hasMatadata": true, + // "canSeekToEnd": false, + // "duration": 0, + // "hasKeyFrames": 0, + // "framerate": 0, + // "videodatarate": 0, + // "filesize": 0, + // } + var flags byte + // if hasAudio { + flags |= (1 << 2) + // metaData["audiocodecid"] = int(at.CodecID) + // metaData["audiosamplerate"] = at.SampleRate + // metaData["audiosamplesize"] = at.SampleSize + // metaData["stereo"] = at.Channels == 2 + // } + // if hasVideo { + flags |= 1 + // metaData["videocodecid"] = int(vt.CodecID) + // metaData["width"] = vt.SPSInfo.Width + // metaData["height"] = vt.SPSInfo.Height + // } + // amf.Marshal(metaData) + // 写入FLV头 + w.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0}) + // codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer) +} + +func (p *HDLPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { + streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv") + if r.URL.RawQuery != "" { + streamPath += "?" + r.URL.RawQuery + } + + sub, err := p.Subscribe(streamPath, w, r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "video/x-flv") + w.Header().Set("Transfer-Encoding", "identity") + w.WriteHeader(http.StatusOK) + wto := p.GetCommonConf().WriteTimeout + var gotFlvTag func(tag *net.Buffers) error + if hijacker, ok := w.(http.Hijacker); ok && wto > 0 { + conn, _, _ := hijacker.Hijack() + conn.SetWriteDeadline(time.Now().Add(wto)) + sub.Closer = conn + p.WriteFlvHeader(sub, conn) + gotFlvTag = func(tag *net.Buffers) (err error) { + conn.SetWriteDeadline(time.Now().Add(wto)) + _, err = tag.WriteTo(conn) + return + } + } else { + w.(http.Flusher).Flush() + p.WriteFlvHeader(sub, w) + gotFlvTag = func(tag *net.Buffers) (err error) { + _, err = tag.WriteTo(w) + return + } + } + b := util.Buffer(make([]byte, 0, 15)) + var flv net.Buffers + sub.Handle(func(audio *rtmp.RTMPAudio) error { + b.Reset() + b.WriteByte(FLV_TAG_TYPE_AUDIO) + dataSize := audio.Length + b.WriteUint24(uint32(dataSize)) + b.WriteUint24(audio.Timestamp) + b.WriteByte(byte(audio.Timestamp >> 24)) + b.WriteUint24(0) + flv = append(append(append(flv, b), audio.Buffers.Buffers...), util.PutBE(b.Malloc(4), dataSize+11)) + return gotFlvTag(&flv) + }, func(video *rtmp.RTMPVideo) error { + b.Reset() + b.WriteByte(FLV_TAG_TYPE_VIDEO) + dataSize := video.Length + b.WriteUint24(uint32(dataSize)) + b.WriteUint24(video.Timestamp) + b.WriteByte(byte(video.Timestamp >> 24)) + b.WriteUint24(0) + flv = append(append(append(flv, b), video.Buffers.Buffers...), util.PutBE(b.Malloc(4), dataSize+11)) + return gotFlvTag(&flv) + }) +} diff --git a/plugin/hdl/pkg/flv.go b/plugin/hdl/pkg/flv.go new file mode 100644 index 0000000..4e8c325 --- /dev/null +++ b/plugin/hdl/pkg/flv.go @@ -0,0 +1,8 @@ +package hdl + +const ( + // FLV Tag Type + FLV_TAG_TYPE_AUDIO = 0x08 + FLV_TAG_TYPE_VIDEO = 0x09 + FLV_TAG_TYPE_SCRIPT = 0x12 +) diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index 4f0092e..3235fe1 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -1,4 +1,4 @@ -package rtmp +package plugin_rtmp import ( "context" @@ -146,8 +146,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error) } else { receivers[cmd.StreamId] = receiver - receiver.Begin() - err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status) + err = receiver.BeginPublish(cmd.TransactionId) } if err != nil { logger.Error("sendMessage publish", "error", err) @@ -164,9 +163,7 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { err = sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error) } else { senders[sender.StreamID] = sender - sender.Begin() - err = sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status) - err = sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status) + sender.BeginPlay(cmd.TransactionId) sender.Init() go sender.Handle(sender.SendAudio, sender.SendVideo) } diff --git a/plugin/rtmp/pkg/amf.go b/plugin/rtmp/pkg/amf.go index da0e48b..a75ce23 100644 --- a/plugin/rtmp/pkg/amf.go +++ b/plugin/rtmp/pkg/amf.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "fmt" diff --git a/plugin/rtmp/pkg/amf3.go b/plugin/rtmp/pkg/amf3.go index eec3080..b08756f 100644 --- a/plugin/rtmp/pkg/amf3.go +++ b/plugin/rtmp/pkg/amf3.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "errors" diff --git a/plugin/rtmp/pkg/audio.go b/plugin/rtmp/pkg/audio.go index 5a8c43c..d37854b 100644 --- a/plugin/rtmp/pkg/audio.go +++ b/plugin/rtmp/pkg/audio.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( . "m7s.live/m7s/v5/pkg" @@ -45,7 +45,8 @@ func (avcc *RTMPAudio) DecodeConfig(track *AVTrack) error { ctx.FrameLengthFlag = (b1 >> 2) & 0x01 ctx.DependsOnCoreCoder = (b1 >> 1) & 0x01 ctx.ExtensionFlag = b1 & 0x01 - ctx.SequenceFrame = avcc + ctx.SequenceFrame = &RTMPAudio{} + ctx.SequenceFrame.ReadFromBytes(avcc.ToBytes()) track.ICodecCtx = &ctx } } diff --git a/plugin/rtmp/pkg/chunk.go b/plugin/rtmp/pkg/chunk.go index e226f56..32406a2 100644 --- a/plugin/rtmp/pkg/chunk.go +++ b/plugin/rtmp/pkg/chunk.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "encoding/binary" diff --git a/plugin/rtmp/pkg/codec.go b/plugin/rtmp/pkg/codec.go index a83871a..e3ceaf3 100644 --- a/plugin/rtmp/pkg/codec.go +++ b/plugin/rtmp/pkg/codec.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "bytes" @@ -353,4 +353,4 @@ type AudioSpecificConfig struct { GASpecificConfig } -var SamplingFrequencies = [...]int{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, 0, 0, 0} \ No newline at end of file +var SamplingFrequencies = [...]int{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, 0, 0, 0} diff --git a/plugin/rtmp/pkg/const.go b/plugin/rtmp/pkg/const.go index c2786b4..4893131 100644 --- a/plugin/rtmp/pkg/const.go +++ b/plugin/rtmp/pkg/const.go @@ -1,6 +1,7 @@ -package pkg +package rtmp import ( + "fmt" "time" "m7s.live/m7s/v5/pkg/util" @@ -24,6 +25,10 @@ type RTMPData struct { util.RecyclableMemory } +func (avcc *RTMPData) Print() string { + return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5]) +} + func (avcc *RTMPData) GetTimestamp() time.Duration { return time.Duration(avcc.Timestamp) * time.Millisecond } diff --git a/plugin/rtmp/pkg/event.go b/plugin/rtmp/pkg/event.go index dc954d5..574f181 100644 --- a/plugin/rtmp/pkg/event.go +++ b/plugin/rtmp/pkg/event.go @@ -1,4 +1,4 @@ -package pkg +package rtmp // http://help.adobe.com/zh_CN/AIR/1.5/jslr/flash/events/NetStatusEvent.html @@ -14,15 +14,15 @@ const ( /* Code */ /* NetStream */ - NetStream_Play_Reset = "NetStream.Play.Reset" // "status" 由播放列表重置导致 - NetStream_Play_Start = "NetStream.Play.Start" // "status" 播放已开始 - NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" // "error" 无法找到传递给 play()方法的 FLV - NetStream_Play_Stop = "NetStream.Play.Stop" // "status" 播放已结束 - NetStream_Play_Failed = "NetStream.Play.Failed" // "error" 出于此表中列出的原因之外的某一原因(例如订阅者没有读取权限),播放发生了错误 - NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" // "status" 发布者已经发布了流 + NetStream_Play_Reset = "NetStream.Play.Reset" // "status" 由播放列表重置导致 + NetStream_Play_Start = "NetStream.Play.Start" // "status" 播放已开始 + NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" // "error" 无法找到传递给 play()方法的 FLV + NetStream_Play_Stop = "NetStream.Play.Stop" // "status" 播放已结束 + NetStream_Play_Failed = "NetStream.Play.Failed" // "error" 出于此表中列出的原因之外的某一原因(例如订阅者没有读取权限),播放发生了错误 + NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" // "status" 发布者已经发布了流 NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" // "status" 发布者已经取消发布了流 - NetStream_Play_Switch = "NetStream.Play.Switch" - NetStream_Play_Complete = "NetStream.Play.Complete" + NetStream_Play_Switch = "NetStream.Play.Switch" + NetStream_Play_Complete = "NetStream.Play.Complete" NetStream_Data_Start = "NetStream.Data.Start" diff --git a/plugin/rtmp/pkg/handshake.go b/plugin/rtmp/pkg/handshake.go index bb71dbb..f4ecc6a 100644 --- a/plugin/rtmp/pkg/handshake.go +++ b/plugin/rtmp/pkg/handshake.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "bytes" diff --git a/plugin/rtmp/pkg/media.go b/plugin/rtmp/pkg/media.go index fe687bb..6f86d6c 100644 --- a/plugin/rtmp/pkg/media.go +++ b/plugin/rtmp/pkg/media.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "errors" @@ -55,6 +55,7 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) { for r.Length > 0 { item := util.Buffer(av.byte16Pool.GetN(16)) defer av.byte16Pool.Put(item) + // item := util.Buffer(make([]byte, 16)) av.WriteTo(RTMP_CHUNK_HEAD_1, &item) // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) chunk = append(chunk, item) @@ -114,33 +115,7 @@ func (r *RTMPSender) SendVideo(video *RTMPVideo) error { return r.video.sendFrame(&video.RTMPData) } -func (r *RTMPSender) Response(tid uint64, code, level string) error { - m := new(ResponsePlayMessage) - m.CommandName = Response_OnStatus - m.TransactionId = tid - m.Infomation = map[string]any{ - "code": code, - "level": level, - "description": "", - } - m.StreamID = r.StreamID - return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m) -} - type RTMPReceiver struct { *m7s.Publisher NetStream } - -func (r *RTMPReceiver) Response(tid uint64, code, level string) error { - m := new(ResponsePublishMessage) - m.CommandName = Response_OnStatus - m.TransactionId = tid - m.Infomation = map[string]any{ - "code": code, - "level": level, - "description": "", - } - m.StreamID = r.StreamID - return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m) -} diff --git a/plugin/rtmp/pkg/msg.go b/plugin/rtmp/pkg/msg.go index aab30dc..261874d 100644 --- a/plugin/rtmp/pkg/msg.go +++ b/plugin/rtmp/pkg/msg.go @@ -1,9 +1,10 @@ -package pkg +package rtmp import ( "encoding/binary" "errors" "strings" + "m7s.live/m7s/v5/pkg/util" ) @@ -182,7 +183,7 @@ func GetRtmpMessage(chunk *Chunk, body util.Buffer) error { // object类型要复杂点. // 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束 func decodeCommandAMF0(chunk *Chunk, body []byte) { - amf := AMF{body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去. + amf := AMF{body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去. cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型. cmdMsg := CommandMessage{ cmd, diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index c8dca30..530c74f 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "bufio" @@ -45,30 +45,6 @@ const ( SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message" ) -type BytesPool struct { - util.Pool[[]byte] - ItemSize int -} - -func (bp *BytesPool) GetN(size int) []byte { - if size != bp.ItemSize { - return make([]byte, size) - } - ret := bp.Pool.Get() - if ret == nil { - return make([]byte, size) - } - return ret -} - -func (bp *BytesPool) Put(b []byte) { - if cap(b) != bp.ItemSize { - bp.ItemSize = cap(b) - bp.Clear() - } - bp.Pool.Put(b) -} - type NetConnection struct { *slog.Logger `json:"-" yaml:"-"` *bufio.Reader `json:"-" yaml:"-"` @@ -85,8 +61,8 @@ type NetConnection struct { AppName string tmpBuf util.Buffer //用来接收/发送小数据,复用内存 chunkHeader util.Buffer - byteChunkPool BytesPool - byte16Pool BytesPool + byteChunkPool util.BytesPool + byte16Pool util.BytesPool writing atomic.Bool // false 可写,true 不可写 } @@ -174,6 +150,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { if unRead := msgLen - chunk.AVData.Length; unRead < needRead { needRead = unRead } + // mem := make([]byte, needRead) mem := conn.byteChunkPool.GetN(needRead) if n, err := conn.ReadFull(mem); err != nil { conn.byteChunkPool.Put(mem) diff --git a/plugin/rtmp/pkg/net-stream.go b/plugin/rtmp/pkg/net-stream.go index 0613a11..104e48e 100644 --- a/plugin/rtmp/pkg/net-stream.go +++ b/plugin/rtmp/pkg/net-stream.go @@ -1,10 +1,40 @@ -package pkg +package rtmp type NetStream struct { *NetConnection StreamID uint32 } -func (ns *NetStream) Begin() { - ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID) +func (ns *NetStream) Response(tid uint64, code, level string) error { + m := new(ResponsePlayMessage) + m.CommandName = Response_OnStatus + m.TransactionId = tid + m.Infomation = map[string]any{ + "code": code, + "level": level, + "description": "", + } + m.StreamID = ns.StreamID + return ns.SendMessage(RTMP_MSG_AMF0_COMMAND, m) +} + +func (ns *NetStream) BeginPublish(tid uint64) error { + err := ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID) + if err != nil { + return err + } + return ns.Response(tid, NetStream_Publish_Start, Level_Status) +} + +func (ns *NetStream) BeginPlay(tid uint64) (err error) { + err = ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID) + if err != nil { + return err + } + err = ns.Response(tid, NetStream_Play_Reset, Level_Status) + if err != nil { + return + } + err = ns.Response(tid, NetStream_Play_Start, Level_Status) + return } diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index 36811fe..5646766 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -1,4 +1,4 @@ -package pkg +package rtmp import ( "time" @@ -36,8 +36,8 @@ func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error { ctx.NalulenSize = int(info.LengthSizeMinusOne&3 + 1) ctx.SPS = info.SequenceParameterSetNALUnit ctx.PPS = info.PictureParameterSetNALUnit - avcc.IPool = nil - ctx.SequenceFrame = avcc + ctx.SequenceFrame = &RTMPVideo{} + ctx.SequenceFrame.ReadFromBytes(avcc.ToBytes()) track.ICodecCtx = &ctx } case "h265": diff --git a/publisher.go b/publisher.go index a624833..48ab4b5 100644 --- a/publisher.go +++ b/publisher.go @@ -93,6 +93,9 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { t.Value.Wrap = data t.Value.Timestamp = data.GetTimestamp() t.Step() + if t.Value.Wrap != nil { + t.Value.Wrap.Recycle() + } } func (p *Publisher) WriteVideo(data IAVFrame) (err error) { @@ -122,7 +125,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence) if t.HistoryRing == nil { if l := t.Size - p.GOP; l > 12 { - t.Debug("resize", "before", t.Size, "after", t.Size-5) + t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5) t.Reduce(5) //缩小缓冲环节省内存 } } diff --git a/server.go b/server.go index 0ae46e2..08b7c8e 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package m7s import ( "context" "log/slog" + "net/http" "os" "path/filepath" "reflect" @@ -58,6 +59,7 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) { s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.config.HTTP.ListenAddrTLS = ":8443" s.config.HTTP.ListenAddr = ":8080" + s.handler = s s.Info("start") var cg map[string]map[string]any @@ -90,6 +92,19 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) { var lv slog.LevelVar lv.UnmarshalText([]byte(s.LogLevel)) slog.SetLogLoggerLevel(lv.Level()) + s.registerHandler() + if s.config.HTTP.ListenAddrTLS != "" { + s.Info("https listen at ", "addr", s.config.HTTP.ListenAddrTLS) + go func() { + s.Stop(s.config.HTTP.ListenTLS()) + }() + } + if s.config.HTTP.ListenAddr != "" { + s.Info("http listen at ", "addr", s.config.HTTP.ListenAddr) + go func() { + s.Stop(s.config.HTTP.Listen()) + }() + } for _, plugin := range plugins { plugin.Init(s, cg[strings.ToLower(plugin.Name)]) } @@ -241,3 +256,6 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error { } return nil } + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { +} diff --git a/subscriber.go b/subscriber.go index 93eaa1e..471445b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -71,10 +71,10 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) { subMode, _ = strconv.Atoi(s.Args.Get(s.SubModeArgName)) } var audioFrame, videoFrame, lastSentAF, lastSentVF *AVFrame - if audioHandler != nil { + if audioHandler != nil && s.SubAudio { a1 = reflect.TypeOf(audioHandler).In(0) } - if videoHandler != nil { + if videoHandler != nil && s.SubVideo { v1 = reflect.TypeOf(videoHandler).In(0) } createAudioReader := func() { @@ -111,13 +111,19 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) { }() sendAudioFrame := func() { lastSentAF = audioFrame - s.Debug("send audio frame", "frame", audioFrame.Sequence) - ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)}) + s.Debug("send audio frame", "seq", audioFrame.Sequence) + res := ah.Call([]reflect.Value{reflect.ValueOf(audioFrame.Wrap)}) + if len(res) > 0 && !res[0].IsNil() { + s.Stop(res[0].Interface().(error)) + } } sendVideoFrame := func() { lastSentVF = videoFrame - s.Debug("send video frame", "frame", videoFrame.Sequence) - vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)}) + s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.Print()) + res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)}) + if len(res) > 0 && !res[0].IsNil() { + s.Stop(res[0].Interface().(error)) + } } for err := s.Err(); err == nil; err = s.Err() { if vr != nil { @@ -135,7 +141,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) { // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) if videoFrame.Wrap.IsIDR() && vr.DecConfChanged() { vr.LastCodecCtx = vr.Track.ICodecCtx - s.Debug("video codec changed") + s.Debug("video codec changed", "data", vr.Track.ICodecCtx.GetSequenceFrame().Print()) vh.Call([]reflect.Value{reflect.ValueOf(vr.Track.ICodecCtx.GetSequenceFrame())}) } if ar != nil {