From 3c227c4acbccd8108b00aba38db5ef04b8b1ffb1 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Mon, 7 Feb 2022 17:29:36 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=E5=BC=95=E6=93=8E=E5=8F=98?= =?UTF-8?q?=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 39 ++++++++++++++++++--------------------- netStream.go | 18 ++++++++---------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/main.go b/main.go index aa420e3..d3821a7 100644 --- a/main.go +++ b/main.go @@ -10,43 +10,40 @@ import ( ) type RTMPConfig struct { - Publish PublishConfig - Subscribe SubscribeConfig - ListenAddr string - ChunkSize int + Publish PublishConfig + Subscribe SubscribeConfig + TCPConfig + ChunkSize int context.Context cancel context.CancelFunc } var config = &RTMPConfig{ - Publish: DefaultPublishConfig, - Subscribe: DefaultSubscribeConfig, - ChunkSize: 4096, - ListenAddr: ":1935", + Publish: DefaultPublishConfig, + Subscribe: DefaultSubscribeConfig, + ChunkSize: 4096, + TCPConfig: TCPConfig{ListenAddr: ":1935"}, } -func (cfg *RTMPConfig) Update(override map[string]any) { - if cfg.cancel == nil || (override != nil && override["ListenAddr"] != nil) { - start() - } -} - -func init() { - InstallPlugin(config) -} - -func start() { +func (cfg *RTMPConfig) Update(override Config) { + override.Unmarshal(cfg) if config.cancel == nil { util.Print(Green("server rtmp start at"), BrightBlue(config.ListenAddr)) - } else { + } else if override.Has("ListenAddr") { config.cancel() util.Print(Green("server rtmp restart at"), BrightBlue(config.ListenAddr)) + } else { + return } config.Context, config.cancel = context.WithCancel(Ctx) - err := util.ListenTCP(config.ListenAddr, config) + err := cfg.Listen(cfg) if err == context.Canceled { log.Println(err) } else { log.Fatal(err) } } + +func init() { + InstallPlugin(config) +} diff --git a/netStream.go b/netStream.go index b544d85..675da52 100644 --- a/netStream.go +++ b/netStream.go @@ -14,7 +14,7 @@ import ( var gstreamid = uint32(64) -func (cfg *RTMPConfig) Process(conn *net.TCPConn) { +func (cfg *RTMPConfig) ServeTCP(conn *net.TCPConn) { nc := NetConnection{ TCPConn: conn, Reader: bufio.NewReader(conn), @@ -132,28 +132,26 @@ func (cfg *RTMPConfig) Process(conn *net.TCPConn) { err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN) err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) - vt, at := subscriber.WaitVideoTrack("h264", "h265"), subscriber.WaitAudioTrack("aac", "pcma", "pcmu") + vt, at := subscriber.WaitVideoTrack(), subscriber.WaitAudioTrack() if vt != nil { frame := vt.DecoderConfiguration err = nc.sendAVMessage(0, frame.AVCC, false, true) - subscriber.OnVideo = func(frame *engine.VideoFrame) bool { - err = nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false) - return err == nil + subscriber.OnVideo = func(frame *engine.VideoFrame) error { + return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false) } } if at != nil { - subscriber.OnAudio = func(frame *engine.AudioFrame) bool { + subscriber.OnAudio = func(frame *engine.AudioFrame) (err error) { if at.CodecID == codec.CodecID_AAC { frame := at.DecoderConfiguration err = nc.sendAVMessage(0, frame.AVCC, true, true) } else { err = nc.sendAVMessage(0, frame.AVCC, true, true) } - subscriber.OnAudio = func(frame *engine.AudioFrame) bool { - err = nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false) - return err == nil + subscriber.OnAudio = func(frame *engine.AudioFrame) error { + return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false) } - return err == nil + return } } go subscriber.Play(at, vt)