diff --git a/internal/core/core.go b/internal/core/core.go index 3021a85e..ab68773c 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -87,7 +87,7 @@ func New(args []string) (*Core, bool) { err = p.createResources(true) if err != nil { - p.Log(logger.Info, "ERR: %s", err) + p.Log(logger.Error, "%s", err) p.closeResources(nil, false) return nil, false } @@ -95,7 +95,7 @@ func New(args []string) (*Core, bool) { if p.confFound { p.confWatcher, err = confwatcher.New(p.confPath) if err != nil { - p.Log(logger.Info, "ERR: %s", err) + p.Log(logger.Error, "%s", err) p.closeResources(nil, false) return nil, false } @@ -144,13 +144,13 @@ outer: newConf, _, err := conf.Load(p.confPath) if err != nil { - p.Log(logger.Info, "ERR: %s", err) + p.Log(logger.Error, "%s", err) break outer } err = p.reloadConf(newConf, false) if err != nil { - p.Log(logger.Info, "ERR: %s", err) + p.Log(logger.Error, "%s", err) break outer } @@ -159,7 +159,7 @@ outer: err := p.reloadConf(newConf, true) if err != nil { - p.Log(logger.Info, "ERR: %s", err) + p.Log(logger.Error, "%s", err) break outer } diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index aef0c14d..c84f04bc 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -3,6 +3,7 @@ package core import ( "bytes" "context" + "errors" "fmt" "io" "net" @@ -195,7 +196,6 @@ func (r *hlsMuxer) PathName() string { func (r *hlsMuxer) run() { defer r.wg.Done() - defer r.log(logger.Info, "destroyed") innerCtx, innerCtxCancel := context.WithCancel(context.Background()) innerReady := make(chan struct{}) @@ -206,36 +206,34 @@ func (r *hlsMuxer) run() { isReady := false -outer: - for { - select { - case <-r.ctx.Done(): - innerCtxCancel() - <-innerErr - break outer + err := func() error { + for { + select { + case <-r.ctx.Done(): + innerCtxCancel() + <-innerErr + return errors.New("terminated") - case req := <-r.request: - if isReady { - req.Res <- r.handleRequest(req) - } else { - r.requests = append(r.requests, req) - } + case req := <-r.request: + if isReady { + req.Res <- r.handleRequest(req) + } else { + r.requests = append(r.requests, req) + } - case <-innerReady: - isReady = true - for _, req := range r.requests { - req.Res <- r.handleRequest(req) - } - r.requests = nil + case <-innerReady: + isReady = true + for _, req := range r.requests { + req.Res <- r.handleRequest(req) + } + r.requests = nil - case err := <-innerErr: - innerCtxCancel() - if err != nil { - r.log(logger.Info, "ERR: %s", err) + case err := <-innerErr: + innerCtxCancel() + return err } - break outer } - } + }() r.ctxCancel() @@ -244,6 +242,8 @@ outer: } r.parent.OnMuxerClose(r) + + r.log(logger.Info, "destroyed (%v)", err) } func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 5c43ae62..76c652bd 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "net" "net/url" "strings" @@ -144,44 +143,45 @@ func (c *rtmpConn) safeState() gortsplib.ServerSessionState { func (c *rtmpConn) run() { defer c.wg.Done() - defer c.log(logger.Info, "closed") - if c.runOnConnect != "" { - c.log(logger.Info, "runOnConnect command started") - _, port, _ := net.SplitHostPort(c.rtspAddress) - onConnectCmd := externalcmd.New(c.runOnConnect, c.runOnConnectRestart, externalcmd.Environment{ - Path: "", - Port: port, - }) + err := func() error { + if c.runOnConnect != "" { + c.log(logger.Info, "runOnConnect command started") + _, port, _ := net.SplitHostPort(c.rtspAddress) + onConnectCmd := externalcmd.New(c.runOnConnect, c.runOnConnectRestart, externalcmd.Environment{ + Path: "", + Port: port, + }) - defer func() { - onConnectCmd.Close() - c.log(logger.Info, "runOnConnect command stopped") - }() - } - - ctx, cancel := context.WithCancel(c.ctx) - runErr := make(chan error) - go func() { - runErr <- c.runInner(ctx) - }() - - select { - case err := <-runErr: - cancel() - - if err != io.EOF { - c.log(logger.Info, "ERR: %s", err) + defer func() { + onConnectCmd.Close() + c.log(logger.Info, "runOnConnect command stopped") + }() } - case <-c.ctx.Done(): - cancel() - <-runErr - } + ctx, cancel := context.WithCancel(c.ctx) + runErr := make(chan error) + go func() { + runErr <- c.runInner(ctx) + }() + + select { + case err := <-runErr: + cancel() + return err + + case <-c.ctx.Done(): + cancel() + <-runErr + return errors.New("terminated") + } + }() c.ctxCancel() c.parent.OnConnClose(c) + + c.log(logger.Info, "closed (%v)", err) } func (c *rtmpConn) runInner(ctx context.Context) error { @@ -505,7 +505,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { switch pkt.Type { case av.H264: if videoTrack == nil { - return fmt.Errorf("ERR: received an H264 frame, but track is not set up") + return fmt.Errorf("received an H264 frame, but track is not set up") } nalus, err := h264.DecodeAVCC(pkt.Data) @@ -532,7 +532,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { pkts, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) if err != nil { - return fmt.Errorf("ERR while encoding H264: %v", err) + return fmt.Errorf("error while encoding H264: %v", err) } bytss := make([][]byte, len(pkts)) @@ -550,12 +550,12 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { case av.AAC: if audioTrack == nil { - return fmt.Errorf("ERR: received an AAC frame, but track is not set up") + return fmt.Errorf("received an AAC frame, but track is not set up") } pkts, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) if err != nil { - return fmt.Errorf("ERR while encoding AAC: %v", err) + return fmt.Errorf("error while encoding AAC: %v", err) } bytss := make([][]byte, len(pkts)) diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index dae9a776..a8e3e832 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -181,7 +181,7 @@ func (s *rtmpSource) runInner() bool { switch pkt.Type { case av.H264: if videoTrack == nil { - return fmt.Errorf("ERR: received an H264 frame, but track is not set up") + return fmt.Errorf("received an H264 frame, but track is not set up") } nalus, err := h264.DecodeAVCC(pkt.Data) @@ -203,7 +203,7 @@ func (s *rtmpSource) runInner() bool { pkts, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) if err != nil { - return fmt.Errorf("ERR while encoding H264: %v", err) + return fmt.Errorf("error while encoding H264: %v", err) } bytss := make([][]byte, len(pkts)) @@ -221,12 +221,12 @@ func (s *rtmpSource) runInner() bool { case av.AAC: if audioTrack == nil { - return fmt.Errorf("ERR: received an AAC frame, but track is not set up") + return fmt.Errorf("received an AAC frame, but track is not set up") } pkts, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) if err != nil { - return fmt.Errorf("ERR while encoding AAC: %v", err) + return fmt.Errorf("error while encoding AAC: %v", err) } bytss := make([][]byte, len(pkts)) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index a596d0bc..9e591feb 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -19,6 +19,7 @@ const ( Debug Level = iota + 1 Info Warn + Error ) // Destination is a log destination. @@ -137,25 +138,33 @@ func writeLevel(buf *bytes.Buffer, level Level, doColor bool) { switch level { case Debug: if doColor { - buf.WriteString(color.RenderString(color.Debug.Code(), "D ")) + buf.WriteString(color.RenderString(color.Debug.Code(), "D")) } else { - buf.WriteString("D ") + buf.WriteString("D") } case Info: if doColor { - buf.WriteString(color.RenderString(color.Green.Code(), "I ")) + buf.WriteString(color.RenderString(color.Green.Code(), "I")) } else { - buf.WriteString("I ") + buf.WriteString("I") } case Warn: if doColor { - buf.WriteString(color.RenderString(color.Warn.Code(), "W ")) + buf.WriteString(color.RenderString(color.Warn.Code(), "W")) } else { - buf.WriteString("W ") + buf.WriteString("W") + } + + case Error: + if doColor { + buf.WriteString(color.RenderString(color.Error.Code(), "E")) + } else { + buf.WriteString("E") } } + buf.WriteByte(' ') } func writeContent(buf *bytes.Buffer, format string, args []interface{}) {