From 2d0c530d97cf54b6cc1a632c11f057cd80a3b161 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 26 Dec 2023 12:48:35 +0100 Subject: [PATCH] replace new() with initialize() (#490) --- client.go | 10 +- client_format.go | 97 ++++++++--------- client_media.go | 46 ++++---- client_reader.go | 11 +- client_udp_listener.go | 101 +++++++++--------- .../h264_decoder.go | 31 +++--- .../main.go | 3 +- .../main.go | 8 +- .../mpegts_muxer.go | 28 ++--- .../client-play-format-h264/h264_decoder.go | 31 +++--- examples/client-play-format-h264/main.go | 3 +- .../h265_decoder.go | 31 +++--- .../main.go | 3 +- .../main.go | 9 +- .../mpegts_muxer.go | 29 ++--- .../client-play-format-h265/h265_decoder.go | 31 +++--- examples/client-play-format-h265/main.go | 3 +- .../main.go | 7 +- .../mpegts_muxer.go | 38 +++---- examples/proxy/client.go | 8 +- examples/proxy/main.go | 6 +- examples/proxy/server.go | 6 +- examples/server-h264-save-to-disk/main.go | 8 +- .../server-h264-save-to-disk/mpegts_muxer.go | 28 ++--- server.go | 44 +++++--- server_conn.go | 40 +++---- server_conn_reader.go | 9 +- server_multicast_writer.go | 34 +++--- server_record_test.go | 4 +- server_session.go | 50 ++++----- server_session_format.go | 15 +-- server_session_media.go | 27 +++-- server_stream.go | 15 ++- server_stream_format.go | 24 ++--- server_stream_media.go | 28 +++-- server_tcp_listener.go | 21 ++-- server_udp_listener.go | 98 ++++++++--------- 37 files changed, 464 insertions(+), 521 deletions(-) diff --git a/client.go b/client.go index fa539d85..5f149d96 100644 --- a/client.go +++ b/client.go @@ -884,7 +884,10 @@ func (c *Client) connOpen() error { c.nconn = nconn bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent) c.conn = conn.NewConn(bc) - c.reader = newClientReader(c) + c.reader = &clientReader{ + c: c, + } + c.reader.start() return nil } @@ -1290,7 +1293,10 @@ func (c *Client) doSetup( }(), } - cm := newClientMedia(c) + cm := &clientMedia{ + c: c, + onPacketRTCP: func(rtcp.Packet) {}, + } if c.effectiveTransport == nil { if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted diff --git a/client_format.go b/client_format.go index 1f5377e6..b5eeb9ac 100644 --- a/client_format.go +++ b/client_format.go @@ -15,50 +15,43 @@ import ( ) type clientFormat struct { - cm *clientMedia - format format.Format + cm *clientMedia + format format.Format + onPacketRTP OnPacketRTPFunc + udpReorderer *rtpreorderer.Reorderer // play tcpLossDetector *rtplossdetector.LossDetector // play rtcpReceiver *rtcpreceiver.RTCPReceiver // play rtcpSender *rtcpsender.RTCPSender // record or back channel - onPacketRTP OnPacketRTPFunc } -func newClientFormat(cm *clientMedia, forma format.Format) *clientFormat { - return &clientFormat{ - cm: cm, - format: forma, - onPacketRTP: func(*rtp.Packet) {}, - } -} - -func (ct *clientFormat) start() { - if ct.cm.c.state == clientStateRecord || ct.cm.media.IsBackChannel { - ct.rtcpSender = rtcpsender.New( - ct.format.ClockRate(), - ct.cm.c.senderReportPeriod, - ct.cm.c.timeNow, +func (cf *clientFormat) start() { + if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel { + cf.rtcpSender = rtcpsender.New( + cf.format.ClockRate(), + cf.cm.c.senderReportPeriod, + cf.cm.c.timeNow, func(pkt rtcp.Packet) { - if !ct.cm.c.DisableRTCPSenderReports { - ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck + if !cf.cm.c.DisableRTCPSenderReports { + cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck } }) } else { - if ct.cm.udpRTPListener != nil { - ct.udpReorderer = rtpreorderer.New() + if cf.cm.udpRTPListener != nil { + cf.udpReorderer = rtpreorderer.New() } else { - ct.tcpLossDetector = rtplossdetector.New() + cf.tcpLossDetector = rtplossdetector.New() } var err error - ct.rtcpReceiver, err = rtcpreceiver.New( - ct.format.ClockRate(), + cf.rtcpReceiver, err = rtcpreceiver.New( + cf.format.ClockRate(), nil, - ct.cm.c.receiverReportPeriod, - ct.cm.c.timeNow, + cf.cm.c.receiverReportPeriod, + cf.cm.c.timeNow, func(pkt rtcp.Packet) { - if ct.cm.udpRTPListener != nil { - ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck + if cf.cm.udpRTPListener != nil { + cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck } }) if err != nil { @@ -67,22 +60,22 @@ func (ct *clientFormat) start() { } } -func (ct *clientFormat) stop() { - if ct.rtcpReceiver != nil { - ct.rtcpReceiver.Close() - ct.rtcpReceiver = nil +func (cf *clientFormat) stop() { + if cf.rtcpReceiver != nil { + cf.rtcpReceiver.Close() + cf.rtcpReceiver = nil } - if ct.rtcpSender != nil { - ct.rtcpSender.Close() + if cf.rtcpSender != nil { + cf.rtcpSender.Close() } } -func (ct *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { - ct.rtcpSender.ProcessPacket(pkt, ntp, ct.format.PTSEqualsDTS(pkt)) +func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { + cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) - ok := ct.cm.c.writer.push(func() { - ct.cm.writePacketRTPInQueue(byts) + ok := cf.cm.c.writer.push(func() { + cf.cm.writePacketRTPInQueue(byts) }) if !ok { return liberrors.ErrClientWriteQueueFull{} @@ -91,40 +84,40 @@ func (ct *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Ti return nil } -func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { - packets, lost := ct.udpReorderer.Process(pkt) +func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) { + packets, lost := cf.udpReorderer.Process(pkt) if lost != 0 { - ct.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) + cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) // do not return } - now := ct.cm.c.timeNow() + now := cf.cm.c.timeNow() for _, pkt := range packets { - err := ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) + err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { - ct.cm.c.OnDecodeError(err) + cf.cm.c.OnDecodeError(err) continue } - ct.onPacketRTP(pkt) + cf.onPacketRTP(pkt) } } -func (ct *clientFormat) readRTPTCP(pkt *rtp.Packet) { - lost := ct.tcpLossDetector.Process(pkt) +func (cf *clientFormat) readRTPTCP(pkt *rtp.Packet) { + lost := cf.tcpLossDetector.Process(pkt) if lost != 0 { - ct.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) + cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) // do not return } - now := ct.cm.c.timeNow() + now := cf.cm.c.timeNow() - err := ct.rtcpReceiver.ProcessPacket(pkt, now, ct.format.PTSEqualsDTS(pkt)) + err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { - ct.cm.c.OnDecodeError(err) + cf.cm.c.OnDecodeError(err) return } - ct.onPacketRTP(pkt) + cf.onPacketRTP(pkt) } diff --git a/client_media.go b/client_media.go index eef96147..66c76277 100644 --- a/client_media.go +++ b/client_media.go @@ -14,7 +14,9 @@ import ( ) type clientMedia struct { - c *Client + c *Client + onPacketRTCP OnPacketRTCPFunc + media *description.Media formats map[uint8]*clientFormat tcpChannel int @@ -25,14 +27,6 @@ type clientMedia struct { tcpBuffer []byte writePacketRTPInQueue func([]byte) writePacketRTCPInQueue func([]byte) - onPacketRTCP OnPacketRTCPFunc -} - -func newClientMedia(c *Client) *clientMedia { - return &clientMedia{ - c: c, - onPacketRTCP: func(rtcp.Packet) {}, - } } func (cm *clientMedia) close() { @@ -49,22 +43,24 @@ func (cm *clientMedia) allocateUDPListeners( rtcpAddress string, ) error { if rtpAddress != ":0" { - l1, err := newClientUDPListener( - cm.c, - multicastEnable, - multicastSourceIP, - rtpAddress, - ) + l1 := &clientUDPListener{ + c: cm.c, + multicastEnable: multicastEnable, + multicastSourceIP: multicastSourceIP, + address: rtpAddress, + } + err := l1.initialize() if err != nil { return err } - l2, err := newClientUDPListener( - cm.c, - multicastEnable, - multicastSourceIP, - rtcpAddress, - ) + l2 := &clientUDPListener{ + c: cm.c, + multicastEnable: multicastEnable, + multicastSourceIP: multicastSourceIP, + address: rtcpAddress, + } + err = l2.initialize() if err != nil { l1.close() return err @@ -75,7 +71,7 @@ func (cm *clientMedia) allocateUDPListeners( } var err error - cm.udpRTPListener, cm.udpRTCPListener, err = newClientUDPListenerPair(cm.c) + cm.udpRTPListener, cm.udpRTCPListener, err = clientAllocateUDPListenerPair(cm.c) return err } @@ -84,7 +80,11 @@ func (cm *clientMedia) setMedia(medi *description.Media) { cm.formats = make(map[uint8]*clientFormat) for _, forma := range medi.Formats { - cm.formats[forma.PayloadType()] = newClientFormat(cm, forma) + cm.formats[forma.PayloadType()] = &clientFormat{ + cm: cm, + format: forma, + onPacketRTP: func(*rtp.Packet) {}, + } } } diff --git a/client_reader.go b/client_reader.go index 1aa0bb33..ca65d519 100644 --- a/client_reader.go +++ b/client_reader.go @@ -8,19 +8,14 @@ import ( ) type clientReader struct { - c *Client + c *Client + mutex sync.Mutex allowInterleavedFrames bool } -func newClientReader(c *Client) *clientReader { - r := &clientReader{ - c: c, - } - +func (r *clientReader) start() { go r.run() - - return r } func (r *clientReader) setAllowInterleavedFrames(v bool) { diff --git a/client_udp_listener.go b/client_udp_listener.go index 7897c287..7fac5124 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -24,22 +24,7 @@ func randInRange(max int) (int, error) { return int(n.Int64()), nil } -type clientUDPListener struct { - c *Client - pc net.PacketConn - - readFunc readFunc - readIP net.IP - readPort int - writeAddr *net.UDPAddr - - running bool - lastPacketTime *int64 - - done chan struct{} -} - -func newClientUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) { +func clientAllocateUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener, error) { // choose two consecutive ports in range 65535-10000 // RTP port must be even and RTCP port odd for { @@ -49,23 +34,26 @@ func newClientUDPListenerPair(c *Client) (*clientUDPListener, *clientUDPListener } rtpPort := v*2 + 10000 - rtpListener, err := newClientUDPListener( - c, - false, - nil, - net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)), - ) + rtcpPort := rtpPort + 1 + + rtpListener := &clientUDPListener{ + c: c, + multicastEnable: false, + multicastSourceIP: nil, + address: net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)), + } + err = rtpListener.initialize() if err != nil { continue } - rtcpPort := rtpPort + 1 - rtcpListener, err := newClientUDPListener( - c, - false, - nil, - net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)), - ) + rtcpListener := &clientUDPListener{ + c: c, + multicastEnable: false, + multicastSourceIP: nil, + address: net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)), + } + err = rtcpListener.initialize() if err != nil { rtpListener.close() continue @@ -80,42 +68,51 @@ type packetConn interface { SetReadBuffer(int) error } -func newClientUDPListener( - c *Client, - multicastEnable bool, - multicastSourceIP net.IP, - address string, -) (*clientUDPListener, error) { - var pc packetConn - if multicastEnable { - intf, err := multicast.InterfaceForSource(multicastSourceIP) +type clientUDPListener struct { + c *Client + multicastEnable bool + multicastSourceIP net.IP + address string + + pc packetConn + readFunc readFunc + readIP net.IP + readPort int + writeAddr *net.UDPAddr + + running bool + lastPacketTime *int64 + + done chan struct{} +} + +func (u *clientUDPListener) initialize() error { + if u.multicastEnable { + intf, err := multicast.InterfaceForSource(u.multicastSourceIP) if err != nil { - return nil, err + return err } - pc, err = multicast.NewSingleConn(intf, address, c.ListenPacket) + u.pc, err = multicast.NewSingleConn(intf, u.address, u.c.ListenPacket) if err != nil { - return nil, err + return err } } else { - tmp, err := c.ListenPacket(restrictNetwork("udp", address)) + tmp, err := u.c.ListenPacket(restrictNetwork("udp", u.address)) if err != nil { - return nil, err + return err } - pc = tmp.(*net.UDPConn) + u.pc = tmp.(*net.UDPConn) } - err := pc.SetReadBuffer(udpKernelReadBufferSize) + err := u.pc.SetReadBuffer(udpKernelReadBufferSize) if err != nil { - pc.Close() - return nil, err + u.pc.Close() + return err } - return &clientUDPListener{ - c: c, - pc: pc, - lastPacketTime: int64Ptr(0), - }, nil + u.lastPacketTime = int64Ptr(0) + return nil } func (u *clientUDPListener) close() { diff --git a/examples/client-play-format-h264-convert-to-jpeg/h264_decoder.go b/examples/client-play-format-h264-convert-to-jpeg/h264_decoder.go index 13fc1503..af026cc1 100644 --- a/examples/client-play-format-h264-convert-to-jpeg/h264_decoder.go +++ b/examples/client-play-format-h264-convert-to-jpeg/h264_decoder.go @@ -29,34 +29,31 @@ type h264Decoder struct { dstFramePtr []uint8 } -// newH264Decoder allocates a new h264Decoder. -func newH264Decoder() (*h264Decoder, error) { +// initialize initializes a h264Decoder. +func (d *h264Decoder) initialize() error { codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H264) if codec == nil { - return nil, fmt.Errorf("avcodec_find_decoder() failed") + return fmt.Errorf("avcodec_find_decoder() failed") } - codecCtx := C.avcodec_alloc_context3(codec) - if codecCtx == nil { - return nil, fmt.Errorf("avcodec_alloc_context3() failed") + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") } - res := C.avcodec_open2(codecCtx, codec, nil) + res := C.avcodec_open2(d.codecCtx, codec, nil) if res < 0 { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("avcodec_open2() failed") + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") } - srcFrame := C.av_frame_alloc() - if srcFrame == nil { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("av_frame_alloc() failed") + d.srcFrame = C.av_frame_alloc() + if d.srcFrame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") } - return &h264Decoder{ - codecCtx: codecCtx, - srcFrame: srcFrame, - }, nil + return nil } // close closes the decoder. diff --git a/examples/client-play-format-h264-convert-to-jpeg/main.go b/examples/client-play-format-h264-convert-to-jpeg/main.go index b8cd798b..817e1266 100644 --- a/examples/client-play-format-h264-convert-to-jpeg/main.go +++ b/examples/client-play-format-h264-convert-to-jpeg/main.go @@ -78,7 +78,8 @@ func main() { } // setup H264 -> raw frames decoder - frameDec, err := newH264Decoder() + frameDec := &h264Decoder{} + frameDec.initialize() if err != nil { panic(err) } diff --git a/examples/client-play-format-h264-save-to-disk/main.go b/examples/client-play-format-h264-save-to-disk/main.go index 252a049d..cdd2c9cb 100644 --- a/examples/client-play-format-h264-save-to-disk/main.go +++ b/examples/client-play-format-h264-save-to-disk/main.go @@ -51,7 +51,11 @@ func main() { } // setup H264 -> MPEG-TS muxer - mpegtsMuxer, err := newMPEGTSMuxer(forma.SPS, forma.PPS) + mpegtsMuxer := &mpegtsMuxer{ + sps: forma.SPS, + pps: forma.PPS, + } + mpegtsMuxer.initialize() if err != nil { panic(err) } @@ -81,7 +85,7 @@ func main() { } // encode the access unit into MPEG-TS - err = mpegtsMuxer.encode(au, pts) + err = mpegtsMuxer.writeH264(au, pts) if err != nil { log.Printf("ERR: %v", err) return diff --git a/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go b/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go index ac2d665d..3d5d9aac 100644 --- a/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-h264-save-to-disk/mpegts_muxer.go @@ -25,28 +25,22 @@ type mpegtsMuxer struct { dtsExtractor *h264.DTSExtractor } -// newMPEGTSMuxer allocates a mpegtsMuxer. -func newMPEGTSMuxer(sps []byte, pps []byte) (*mpegtsMuxer, error) { - f, err := os.Create("mystream.ts") +// initialize initializes a mpegtsMuxer. +func (e *mpegtsMuxer) initialize() error { + var err error + e.f, err = os.Create("mystream.ts") if err != nil { - return nil, err + return err } - b := bufio.NewWriter(f) + e.b = bufio.NewWriter(e.f) - track := &mpegts.Track{ + e.track = &mpegts.Track{ Codec: &mpegts.CodecH264{}, } - w := mpegts.NewWriter(b, []*mpegts.Track{track}) + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.track}) - return &mpegtsMuxer{ - sps: sps, - pps: pps, - f: f, - b: b, - w: w, - track: track, - }, nil + return nil } // close closes all the mpegtsMuxer resources. @@ -55,8 +49,8 @@ func (e *mpegtsMuxer) close() { e.f.Close() } -// encode encodes a H264 access unit into MPEG-TS. -func (e *mpegtsMuxer) encode(au [][]byte, pts time.Duration) error { +// writeH264 writes a H264 access unit into MPEG-TS. +func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { // prepend an AUD. This is required by some players filteredAU := [][]byte{ {byte(h264.NALUTypeAccessUnitDelimiter), 240}, diff --git a/examples/client-play-format-h264/h264_decoder.go b/examples/client-play-format-h264/h264_decoder.go index 13fc1503..af026cc1 100644 --- a/examples/client-play-format-h264/h264_decoder.go +++ b/examples/client-play-format-h264/h264_decoder.go @@ -29,34 +29,31 @@ type h264Decoder struct { dstFramePtr []uint8 } -// newH264Decoder allocates a new h264Decoder. -func newH264Decoder() (*h264Decoder, error) { +// initialize initializes a h264Decoder. +func (d *h264Decoder) initialize() error { codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H264) if codec == nil { - return nil, fmt.Errorf("avcodec_find_decoder() failed") + return fmt.Errorf("avcodec_find_decoder() failed") } - codecCtx := C.avcodec_alloc_context3(codec) - if codecCtx == nil { - return nil, fmt.Errorf("avcodec_alloc_context3() failed") + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") } - res := C.avcodec_open2(codecCtx, codec, nil) + res := C.avcodec_open2(d.codecCtx, codec, nil) if res < 0 { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("avcodec_open2() failed") + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") } - srcFrame := C.av_frame_alloc() - if srcFrame == nil { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("av_frame_alloc() failed") + d.srcFrame = C.av_frame_alloc() + if d.srcFrame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") } - return &h264Decoder{ - codecCtx: codecCtx, - srcFrame: srcFrame, - }, nil + return nil } // close closes the decoder. diff --git a/examples/client-play-format-h264/main.go b/examples/client-play-format-h264/main.go index 6e2abf2c..19de7783 100644 --- a/examples/client-play-format-h264/main.go +++ b/examples/client-play-format-h264/main.go @@ -54,7 +54,8 @@ func main() { } // setup H264 -> raw frames decoder - frameDec, err := newH264Decoder() + frameDec := &h264Decoder{} + frameDec.initialize() if err != nil { panic(err) } diff --git a/examples/client-play-format-h265-convert-to-jpeg/h265_decoder.go b/examples/client-play-format-h265-convert-to-jpeg/h265_decoder.go index fc01aaed..9bcbaa50 100644 --- a/examples/client-play-format-h265-convert-to-jpeg/h265_decoder.go +++ b/examples/client-play-format-h265-convert-to-jpeg/h265_decoder.go @@ -29,34 +29,31 @@ type h265Decoder struct { dstFramePtr []uint8 } -// newH265Decoder allocates a new h265Decoder. -func newH265Decoder() (*h265Decoder, error) { +// initialize initializes a h265Decoder. +func (d *h265Decoder) initialize() error { codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H265) if codec == nil { - return nil, fmt.Errorf("avcodec_find_decoder() failed") + return fmt.Errorf("avcodec_find_decoder() failed") } - codecCtx := C.avcodec_alloc_context3(codec) - if codecCtx == nil { - return nil, fmt.Errorf("avcodec_alloc_context3() failed") + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") } - res := C.avcodec_open2(codecCtx, codec, nil) + res := C.avcodec_open2(d.codecCtx, codec, nil) if res < 0 { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("avcodec_open2() failed") + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") } - srcFrame := C.av_frame_alloc() - if srcFrame == nil { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("av_frame_alloc() failed") + d.srcFrame = C.av_frame_alloc() + if d.srcFrame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") } - return &h265Decoder{ - codecCtx: codecCtx, - srcFrame: srcFrame, - }, nil + return nil } // close closes the decoder. diff --git a/examples/client-play-format-h265-convert-to-jpeg/main.go b/examples/client-play-format-h265-convert-to-jpeg/main.go index 2700c0e9..d48556ed 100644 --- a/examples/client-play-format-h265-convert-to-jpeg/main.go +++ b/examples/client-play-format-h265-convert-to-jpeg/main.go @@ -78,7 +78,8 @@ func main() { } // setup H265 -> raw frames decoder - frameDec, err := newH265Decoder() + frameDec := &h265Decoder{} + err = frameDec.initialize() if err != nil { panic(err) } diff --git a/examples/client-play-format-h265-save-to-disk/main.go b/examples/client-play-format-h265-save-to-disk/main.go index 9d58f281..2689e418 100644 --- a/examples/client-play-format-h265-save-to-disk/main.go +++ b/examples/client-play-format-h265-save-to-disk/main.go @@ -51,7 +51,12 @@ func main() { } // setup H265 -> MPEG-TS muxer - mpegtsMuxer, err := newMPEGTSMuxer(forma.VPS, forma.SPS, forma.PPS) + mpegtsMuxer := &mpegtsMuxer{ + vps: forma.VPS, + sps: forma.SPS, + pps: forma.PPS, + } + mpegtsMuxer.initialize() if err != nil { panic(err) } @@ -81,7 +86,7 @@ func main() { } // encode the access unit into MPEG-TS - err = mpegtsMuxer.encode(au, pts) + err = mpegtsMuxer.writeH265(au, pts) if err != nil { log.Printf("ERR: %v", err) return diff --git a/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go b/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go index 39d8c612..2b8c409a 100644 --- a/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-h265-save-to-disk/mpegts_muxer.go @@ -26,29 +26,22 @@ type mpegtsMuxer struct { dtsExtractor *h265.DTSExtractor } -// newMPEGTSMuxer allocates a mpegtsMuxer. -func newMPEGTSMuxer(vps []byte, sps []byte, pps []byte) (*mpegtsMuxer, error) { - f, err := os.Create("mystream.ts") +// initialize initializes a mpegtsMuxer. +func (e *mpegtsMuxer) initialize() error { + var err error + e.f, err = os.Create("mystream.ts") if err != nil { - return nil, err + return err } - b := bufio.NewWriter(f) + e.b = bufio.NewWriter(e.f) - track := &mpegts.Track{ + e.track = &mpegts.Track{ Codec: &mpegts.CodecH265{}, } - w := mpegts.NewWriter(b, []*mpegts.Track{track}) + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.track}) - return &mpegtsMuxer{ - vps: vps, - sps: sps, - pps: pps, - f: f, - b: b, - w: w, - track: track, - }, nil + return nil } // close closes all the mpegtsMuxer resources. @@ -57,8 +50,8 @@ func (e *mpegtsMuxer) close() { e.f.Close() } -// encode encodes a H265 access unit into MPEG-TS. -func (e *mpegtsMuxer) encode(au [][]byte, pts time.Duration) error { +// writeH265 writes a H265 access unit into MPEG-TS. +func (e *mpegtsMuxer) writeH265(au [][]byte, pts time.Duration) error { // prepend an AUD. This is required by some players filteredAU := [][]byte{ {byte(h265.NALUType_AUD_NUT) << 1, 1, 0x50}, diff --git a/examples/client-play-format-h265/h265_decoder.go b/examples/client-play-format-h265/h265_decoder.go index fc01aaed..9bcbaa50 100644 --- a/examples/client-play-format-h265/h265_decoder.go +++ b/examples/client-play-format-h265/h265_decoder.go @@ -29,34 +29,31 @@ type h265Decoder struct { dstFramePtr []uint8 } -// newH265Decoder allocates a new h265Decoder. -func newH265Decoder() (*h265Decoder, error) { +// initialize initializes a h265Decoder. +func (d *h265Decoder) initialize() error { codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H265) if codec == nil { - return nil, fmt.Errorf("avcodec_find_decoder() failed") + return fmt.Errorf("avcodec_find_decoder() failed") } - codecCtx := C.avcodec_alloc_context3(codec) - if codecCtx == nil { - return nil, fmt.Errorf("avcodec_alloc_context3() failed") + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") } - res := C.avcodec_open2(codecCtx, codec, nil) + res := C.avcodec_open2(d.codecCtx, codec, nil) if res < 0 { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("avcodec_open2() failed") + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") } - srcFrame := C.av_frame_alloc() - if srcFrame == nil { - C.avcodec_close(codecCtx) - return nil, fmt.Errorf("av_frame_alloc() failed") + d.srcFrame = C.av_frame_alloc() + if d.srcFrame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") } - return &h265Decoder{ - codecCtx: codecCtx, - srcFrame: srcFrame, - }, nil + return nil } // close closes the decoder. diff --git a/examples/client-play-format-h265/main.go b/examples/client-play-format-h265/main.go index c766d877..5145e61d 100644 --- a/examples/client-play-format-h265/main.go +++ b/examples/client-play-format-h265/main.go @@ -54,7 +54,8 @@ func main() { } // setup H265 -> raw frames decoder - frameDec, err := newH265Decoder() + frameDec := &h265Decoder{} + err = frameDec.initialize() if err != nil { panic(err) } diff --git a/examples/client-play-format-mpeg4audio-save-to-disk/main.go b/examples/client-play-format-mpeg4audio-save-to-disk/main.go index 1b3c3ace..0dc02a3e 100644 --- a/examples/client-play-format-mpeg4audio-save-to-disk/main.go +++ b/examples/client-play-format-mpeg4audio-save-to-disk/main.go @@ -50,7 +50,10 @@ func main() { } // setup MPEG-4 audio -> MPEG-TS muxer - mpegtsMuxer, err := newMPEGTSMuxer(forma.Config) + mpegtsMuxer := &mpegtsMuxer{ + config: forma.Config, + } + mpegtsMuxer.initialize() if err != nil { panic(err) } @@ -78,7 +81,7 @@ func main() { } // encode access units into MPEG-TS - err = mpegtsMuxer.encode(aus, pts) + err = mpegtsMuxer.writeMPEG4Audio(aus, pts) if err != nil { log.Printf("ERR: %v", err) return diff --git a/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go b/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go index a5fc7a1a..c503b183 100644 --- a/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go +++ b/examples/client-play-format-mpeg4audio-save-to-disk/mpegts_muxer.go @@ -16,35 +16,31 @@ func durationGoToMPEGTS(v time.Duration) int64 { // mpegtsMuxer allows to save a MPEG4-audio stream into a MPEG-TS file. type mpegtsMuxer struct { config *mpeg4audio.Config - f *os.File - b *bufio.Writer - w *mpegts.Writer - track *mpegts.Track + + f *os.File + b *bufio.Writer + w *mpegts.Writer + track *mpegts.Track } -// newMPEGTSMuxer allocates a mpegtsMuxer. -func newMPEGTSMuxer(config *mpeg4audio.Config) (*mpegtsMuxer, error) { - f, err := os.Create("mystream.ts") +// initialize initializes a mpegtsMuxer. +func (e *mpegtsMuxer) initialize() error { + var err error + e.f, err = os.Create("mystream.ts") if err != nil { - return nil, err + return err } - b := bufio.NewWriter(f) + e.b = bufio.NewWriter(e.f) - track := &mpegts.Track{ + e.track = &mpegts.Track{ Codec: &mpegts.CodecMPEG4Audio{ - Config: *config, + Config: *e.config, }, } - w := mpegts.NewWriter(b, []*mpegts.Track{track}) + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.track}) - return &mpegtsMuxer{ - config: config, - f: f, - b: b, - w: w, - track: track, - }, nil + return nil } // close closes all the mpegtsMuxer resources. @@ -53,7 +49,7 @@ func (e *mpegtsMuxer) close() { e.f.Close() } -// encode encodes MPEG-4 audio access units into MPEG-TS. -func (e *mpegtsMuxer) encode(aus [][]byte, pts time.Duration) error { +// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. +func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts time.Duration) error { return e.w.WriteMPEG4Audio(e.track, durationGoToMPEGTS(pts), aus) } diff --git a/examples/proxy/client.go b/examples/proxy/client.go index 5936dd4b..7a9956c5 100644 --- a/examples/proxy/client.go +++ b/examples/proxy/client.go @@ -20,15 +20,9 @@ type client struct { s *server } -func newClient(s *server) *client { - c := &client{ - s: s, - } - +func (c *client) initialize() { // start a separated routine go c.run() - - return c } func (c *client) run() { diff --git a/examples/proxy/main.go b/examples/proxy/main.go index dbe9606e..6b1919b8 100644 --- a/examples/proxy/main.go +++ b/examples/proxy/main.go @@ -9,11 +9,13 @@ import "log" func main() { // allocate the server. - s := newServer() + s := &server{} + s.initialize() // allocate the client. // give client access to the server. - newClient(s) + c := &client{s: s} + c.initialize() // start server and wait until a fatal error log.Printf("server is ready") diff --git a/examples/proxy/server.go b/examples/proxy/server.go index 660f9c7a..680817d4 100644 --- a/examples/proxy/server.go +++ b/examples/proxy/server.go @@ -15,9 +15,7 @@ type server struct { stream *gortsplib.ServerStream } -func newServer() *server { - s := &server{} - +func (s *server) initialize() { // configure the server s.s = &gortsplib.Server{ Handler: s, @@ -28,8 +26,6 @@ func newServer() *server { MulticastRTPPort: 8002, MulticastRTCPPort: 8003, } - - return s } // called when a connection is opened. diff --git a/examples/server-h264-save-to-disk/main.go b/examples/server-h264-save-to-disk/main.go index fed4889f..00757c2c 100644 --- a/examples/server-h264-save-to-disk/main.go +++ b/examples/server-h264-save-to-disk/main.go @@ -83,7 +83,11 @@ func (sh *serverHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) ( } // setup H264 -> MPEGTS muxer - mpegtsMuxer, err := newMPEGTSMuxer(forma.SPS, forma.PPS) + mpegtsMuxer := &mpegtsMuxer{ + sps: forma.SPS, + pps: forma.PPS, + } + mpegtsMuxer.initialize() if err != nil { return &base.Response{ StatusCode: base.StatusBadRequest, @@ -128,7 +132,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas } // encode H264 access unit into MPEG-TS - sh.mpegtsMuxer.encode(au, pts) + sh.mpegtsMuxer.writeH264(au, pts) }) return &base.Response{ diff --git a/examples/server-h264-save-to-disk/mpegts_muxer.go b/examples/server-h264-save-to-disk/mpegts_muxer.go index f36fe4e2..9202e20e 100644 --- a/examples/server-h264-save-to-disk/mpegts_muxer.go +++ b/examples/server-h264-save-to-disk/mpegts_muxer.go @@ -25,28 +25,22 @@ type mpegtsMuxer struct { dtsExtractor *h264.DTSExtractor } -// newMPEGTSMuxer allocates a mpegtsMuxer. -func newMPEGTSMuxer(sps []byte, pps []byte) (*mpegtsMuxer, error) { - f, err := os.Create("mystream.ts") +// initialize initializes a mpegtsMuxer. +func (e *mpegtsMuxer) initialize() error { + var err error + e.f, err = os.Create("mystream.ts") if err != nil { - return nil, err + return err } - b := bufio.NewWriter(f) + e.b = bufio.NewWriter(e.f) - track := &mpegts.Track{ + e.track = &mpegts.Track{ Codec: &mpegts.CodecH264{}, } - w := mpegts.NewWriter(b, []*mpegts.Track{track}) + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.track}) - return &mpegtsMuxer{ - sps: sps, - pps: pps, - f: f, - b: b, - w: w, - track: track, - }, nil + return nil } // close closes all the mpegtsMuxer resources. @@ -55,8 +49,8 @@ func (e *mpegtsMuxer) close() { e.f.Close() } -// encode encodes a H264 access unit into MPEG-TS. -func (e *mpegtsMuxer) encode(au [][]byte, pts time.Duration) error { +// writeH264 writes a H264 access unit into MPEG-TS. +func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error { // prepend an AUD. This is required by some players filteredAU := [][]byte{ {byte(h264.NALUTypeAccessUnitDelimiter), 240}, diff --git a/server.go b/server.go index 05e88617..cf4885d4 100644 --- a/server.go +++ b/server.go @@ -218,22 +218,24 @@ func (s *Server) Start() error { return fmt.Errorf("RTP and RTCP ports must be consecutive") } - s.udpRTPListener, err = newServerUDPListener( - s.ListenPacket, - s.WriteTimeout, - false, - s.UDPRTPAddress, - ) + s.udpRTPListener = &serverUDPListener{ + listenPacket: s.ListenPacket, + writeTimeout: s.WriteTimeout, + multicastEnable: false, + address: s.UDPRTPAddress, + } + err = s.udpRTPListener.initialize() if err != nil { return err } - s.udpRTCPListener, err = newServerUDPListener( - s.ListenPacket, - s.WriteTimeout, - false, - s.UDPRTCPAddress, - ) + s.udpRTCPListener = &serverUDPListener{ + listenPacket: s.ListenPacket, + writeTimeout: s.WriteTimeout, + multicastEnable: false, + address: s.UDPRTCPAddress, + } + err = s.udpRTCPListener.initialize() if err != nil { s.udpRTPListener.close() return err @@ -299,8 +301,10 @@ func (s *Server) Start() error { s.chCloseSession = make(chan *ServerSession) s.chGetMulticastIP = make(chan chGetMulticastIPReq) - var err error - s.tcpListener, err = newServerTCPListener(s) + s.tcpListener = &serverTCPListener{ + s: s, + } + err := s.tcpListener.initialize() if err != nil { if s.udpRTPListener != nil { s.udpRTPListener.close() @@ -356,7 +360,11 @@ func (s *Server) runInner() error { return err case nconn := <-s.chNewConn: - sc := newServerConn(s, nconn) + sc := &ServerConn{ + s: s, + nconn: nconn, + } + sc.initialize() s.conns[sc] = struct{}{} case sc := <-s.chCloseConn: @@ -400,7 +408,11 @@ func (s *Server) runInner() error { continue } - ss := newServerSession(s, req.sc) + ss := &ServerSession{ + s: s, + author: req.sc, + } + ss.initialize() s.sessions[ss.secretID] = ss select { diff --git a/server_conn.go b/server_conn.go index eb26fa68..b51ef5c0 100644 --- a/server_conn.go +++ b/server_conn.go @@ -81,33 +81,24 @@ type ServerConn struct { done chan struct{} } -func newServerConn( - s *Server, - nconn net.Conn, -) *ServerConn { - ctx, ctxCancel := context.WithCancel(s.ctx) +func (sc *ServerConn) initialize() { + ctx, ctxCancel := context.WithCancel(sc.s.ctx) - if s.TLSConfig != nil { - nconn = tls.Server(nconn, s.TLSConfig) + if sc.s.TLSConfig != nil { + sc.nconn = tls.Server(sc.nconn, sc.s.TLSConfig) } - sc := &ServerConn{ - s: s, - nconn: nconn, - bc: bytecounter.New(nconn, nil, nil), - ctx: ctx, - ctxCancel: ctxCancel, - remoteAddr: nconn.RemoteAddr().(*net.TCPAddr), - chReadRequest: make(chan readReq), - chReadError: make(chan error), - chRemoveSession: make(chan *ServerSession), - done: make(chan struct{}), - } + sc.bc = bytecounter.New(sc.nconn, nil, nil) + sc.ctx = ctx + sc.ctxCancel = ctxCancel + sc.remoteAddr = sc.nconn.RemoteAddr().(*net.TCPAddr) + sc.chReadRequest = make(chan readReq) + sc.chReadError = make(chan error) + sc.chRemoveSession = make(chan *ServerSession) + sc.done = make(chan struct{}) - s.wg.Add(1) + sc.s.wg.Add(1) go sc.run() - - return sc } // Close closes the ServerConn. @@ -159,7 +150,10 @@ func (sc *ServerConn) run() { } sc.conn = conn.NewConn(sc.bc) - cr := newServerConnReader(sc) + cr := &serverConnReader{ + sc: sc, + } + cr.initialize() err := sc.runInner() diff --git a/server_conn_reader.go b/server_conn_reader.go index 43d6660b..489e44b0 100644 --- a/server_conn_reader.go +++ b/server_conn_reader.go @@ -27,15 +27,10 @@ type serverConnReader struct { chReadDone chan struct{} } -func newServerConnReader(sc *ServerConn) *serverConnReader { - cr := &serverConnReader{ - sc: sc, - chReadDone: make(chan struct{}), - } +func (cr *serverConnReader) initialize() { + cr.chReadDone = make(chan struct{}) go cr.run() - - return cr } func (cr *serverConnReader) wait() { diff --git a/server_multicast_writer.go b/server_multicast_writer.go index cc2af0ef..6795aca3 100644 --- a/server_multicast_writer.go +++ b/server_multicast_writer.go @@ -7,6 +7,8 @@ import ( ) type serverMulticastWriter struct { + s *Server + rtpl *serverUDPListener rtcpl *serverUDPListener writer asyncProcessor @@ -14,21 +16,21 @@ type serverMulticastWriter struct { rtcpAddr *net.UDPAddr } -func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { - ip, err := s.getMulticastIP() +func (h *serverMulticastWriter) initialize() error { + ip, err := h.s.getMulticastIP() if err != nil { - return nil, err + return err } - rtpl, rtcpl, err := newServerUDPListenerMulticastPair( - s.ListenPacket, - s.WriteTimeout, - s.MulticastRTPPort, - s.MulticastRTCPPort, + rtpl, rtcpl, err := serverAllocateUDPListenerMulticastPair( + h.s.ListenPacket, + h.s.WriteTimeout, + h.s.MulticastRTPPort, + h.s.MulticastRTCPPort, ip, ) if err != nil { - return nil, err + return err } rtpAddr := &net.UDPAddr{ @@ -41,17 +43,15 @@ func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { Port: rtcpl.port(), } - h := &serverMulticastWriter{ - rtpl: rtpl, - rtcpl: rtcpl, - rtpAddr: rtpAddr, - rtcpAddr: rtcpAddr, - } + h.rtpl = rtpl + h.rtcpl = rtcpl + h.rtpAddr = rtpAddr + h.rtcpAddr = rtcpAddr - h.writer.allocateBuffer(s.WriteQueueSize) + h.writer.allocateBuffer(h.s.WriteQueueSize) h.writer.start() - return h, nil + return nil } func (h *serverMulticastWriter) close() { diff --git a/server_record_test.go b/server_record_test.go index 512ba956..e9f5c988 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -61,7 +61,7 @@ func invalidURLAnnounceReq(t *testing.T, control string) base.Request { UnicastAddress: "127.0.0.1", }, TimeDescriptions: []psdp.TimeDescription{ - {Timing: psdp.Timing{0, 0}}, //nolint:govet + {Timing: psdp.Timing{}}, }, MediaDescriptions: []*psdp.MediaDescription{medi.Marshal()}, } @@ -272,7 +272,7 @@ func TestServerRecordPath(t *testing.T) { UnicastAddress: "127.0.0.1", }, TimeDescriptions: []psdp.TimeDescription{ - {Timing: psdp.Timing{0, 0}}, //nolint:govet + {Timing: psdp.Timing{}}, }, MediaDescriptions: []*psdp.MediaDescription{media.Marshal()}, } diff --git a/server_session.go b/server_session.go index bfe8163e..e4a56dd2 100644 --- a/server_session.go +++ b/server_session.go @@ -177,10 +177,10 @@ func (s ServerSessionState) String() string { // ServerSession is a server-side RTSP session. type ServerSession struct { - s *Server - secretID string // must not be shared, allows to take ownership of the session - author *ServerConn + s *Server + author *ServerConn + secretID string // must not be shared, allows to take ownership of the session ctx context.Context ctxCancel func() bytesReceived *uint64 @@ -209,35 +209,26 @@ type ServerSession struct { chStartWriter chan struct{} } -func newServerSession( - s *Server, - author *ServerConn, -) *ServerSession { - ctx, ctxCancel := context.WithCancel(s.ctx) +func (ss *ServerSession) initialize() { + ctx, ctxCancel := context.WithCancel(ss.s.ctx) // use an UUID without dashes, since dashes confuse some clients. secretID := strings.ReplaceAll(uuid.New().String(), "-", "") - ss := &ServerSession{ - s: s, - secretID: secretID, - author: author, - ctx: ctx, - ctxCancel: ctxCancel, - bytesReceived: new(uint64), - bytesSent: new(uint64), - conns: make(map[*ServerConn]struct{}), - lastRequestTime: s.timeNow(), - udpCheckStreamTimer: emptyTimer(), - chHandleRequest: make(chan sessionRequestReq), - chRemoveConn: make(chan *ServerConn), - chStartWriter: make(chan struct{}), - } + ss.secretID = secretID + ss.ctx = ctx + ss.ctxCancel = ctxCancel + ss.bytesReceived = new(uint64) + ss.bytesSent = new(uint64) + ss.conns = make(map[*ServerConn]struct{}) + ss.lastRequestTime = ss.s.timeNow() + ss.udpCheckStreamTimer = emptyTimer() + ss.chHandleRequest = make(chan sessionRequestReq) + ss.chRemoveConn = make(chan *ServerConn) + ss.chStartWriter = make(chan struct{}) - s.wg.Add(1) + ss.s.wg.Add(1) go ss.run() - - return ss } // Close closes the ServerSession. @@ -831,7 +822,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( res.Header = make(base.Header) } - sm := newServerSessionMedia(ss, medi) + sm := &serverSessionMedia{ + ss: ss, + media: medi, + onPacketRTCP: func(p rtcp.Packet) {}, + } + sm.initialize() switch transport { case TransportUDP: diff --git a/server_session_format.go b/server_session_format.go index 658683f4..7988dc80 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -14,20 +14,13 @@ import ( ) type serverSessionFormat struct { - sm *serverSessionMedia - format format.Format + sm *serverSessionMedia + format format.Format + onPacketRTP OnPacketRTPFunc + udpReorderer *rtpreorderer.Reorderer tcpLossDetector *rtplossdetector.LossDetector rtcpReceiver *rtcpreceiver.RTCPReceiver - onPacketRTP OnPacketRTPFunc -} - -func newServerSessionFormat(sm *serverSessionMedia, forma format.Format) *serverSessionFormat { - return &serverSessionFormat{ - sm: sm, - format: forma, - onPacketRTP: func(*rtp.Packet) {}, - } } func (sf *serverSessionFormat) start() { diff --git a/server_session_media.go b/server_session_media.go index 3e5a2f74..858d9114 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -14,8 +14,10 @@ import ( ) type serverSessionMedia struct { - ss *ServerSession - media *description.Media + ss *ServerSession + media *description.Media + onPacketRTCP OnPacketRTCPFunc + tcpChannel int udpRTPReadPort int udpRTPWriteAddr *net.UDPAddr @@ -27,24 +29,19 @@ type serverSessionMedia struct { formats map[uint8]*serverSessionFormat // record only writePacketRTPInQueue func([]byte) writePacketRTCPInQueue func([]byte) - onPacketRTCP OnPacketRTCPFunc } -func newServerSessionMedia(ss *ServerSession, medi *description.Media) *serverSessionMedia { - sm := &serverSessionMedia{ - ss: ss, - media: medi, - onPacketRTCP: func(rtcp.Packet) {}, - } - - if ss.state == ServerSessionStatePreRecord { +func (sm *serverSessionMedia) initialize() { + if sm.ss.state == ServerSessionStatePreRecord { sm.formats = make(map[uint8]*serverSessionFormat) - for _, forma := range medi.Formats { - sm.formats[forma.PayloadType()] = newServerSessionFormat(sm, forma) + for _, forma := range sm.media.Formats { + sm.formats[forma.PayloadType()] = &serverSessionFormat{ + sm: sm, + format: forma, + onPacketRTP: func(*rtp.Packet) {}, + } } } - - return sm } func (sm *serverSessionMedia) start() { diff --git a/server_stream.go b/server_stream.go index cce0b817..b479497e 100644 --- a/server_stream.go +++ b/server_stream.go @@ -53,7 +53,13 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream { st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) for i, medi := range desc.Medias { - st.streamMedias[medi] = newServerStreamMedia(st, medi, i) + sm := &serverStreamMedia{ + st: st, + media: medi, + trackID: i, + } + sm.initialize() + st.streamMedias[medi] = sm } return st @@ -170,11 +176,14 @@ func (st *ServerStream) readerAdd( case TransportUDPMulticast: if st.multicastReaderCount == 0 { for _, media := range st.streamMedias { - mh, err := newServerMulticastWriter(st.s) + mw := &serverMulticastWriter{ + s: st.s, + } + err := mw.initialize() if err != nil { return err } - media.multicastWriter = mh + media.multicastWriter = mw } } st.multicastReaderCount++ diff --git a/server_stream_format.go b/server_stream_format.go index 09114dba..0b21346c 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -12,29 +12,23 @@ import ( ) type serverStreamFormat struct { - sm *serverStreamMedia - format format.Format + sm *serverStreamMedia + format format.Format + rtcpSender *rtcpsender.RTCPSender } -func newServerStreamFormat(sm *serverStreamMedia, forma format.Format) *serverStreamFormat { - sf := &serverStreamFormat{ - sm: sm, - format: forma, - } - +func (sf *serverStreamFormat) initialize() { sf.rtcpSender = rtcpsender.New( - forma.ClockRate(), - sm.st.s.senderReportPeriod, - sm.st.s.timeNow, + sf.format.ClockRate(), + sf.sm.st.s.senderReportPeriod, + sf.sm.st.s.timeNow, func(pkt rtcp.Packet) { - if !sm.st.s.DisableRTCPSenderReports { - sm.st.WritePacketRTCP(sm.media, pkt) //nolint:errcheck + if !sf.sm.st.s.DisableRTCPSenderReports { + sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck } }, ) - - return sf } func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { diff --git a/server_stream_media.go b/server_stream_media.go index 5368d9c8..c984c6e5 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -5,28 +5,24 @@ import ( ) type serverStreamMedia struct { - st *ServerStream - media *description.Media - trackID int + st *ServerStream + media *description.Media + trackID int + formats map[uint8]*serverStreamFormat multicastWriter *serverMulticastWriter } -func newServerStreamMedia(st *ServerStream, medi *description.Media, trackID int) *serverStreamMedia { - sm := &serverStreamMedia{ - st: st, - media: medi, - trackID: trackID, - } - +func (sm *serverStreamMedia) initialize() { sm.formats = make(map[uint8]*serverStreamFormat) - for _, forma := range medi.Formats { - sm.formats[forma.PayloadType()] = newServerStreamFormat( - sm, - forma) + for _, forma := range sm.media.Formats { + sf := &serverStreamFormat{ + sm: sm, + format: forma, + } + sf.initialize() + sm.formats[forma.PayloadType()] = sf } - - return sm } func (sm *serverStreamMedia) close() { diff --git a/server_tcp_listener.go b/server_tcp_listener.go index 7462020e..e90244c7 100644 --- a/server_tcp_listener.go +++ b/server_tcp_listener.go @@ -5,27 +5,22 @@ import ( ) type serverTCPListener struct { - s *Server + s *Server + ln net.Listener } -func newServerTCPListener( - s *Server, -) (*serverTCPListener, error) { - ln, err := s.Listen(restrictNetwork("tcp", s.RTSPAddress)) +func (sl *serverTCPListener) initialize() error { + var err error + sl.ln, err = sl.s.Listen(restrictNetwork("tcp", sl.s.RTSPAddress)) if err != nil { - return nil, err + return err } - sl := &serverTCPListener{ - s: s, - ln: ln, - } - - s.wg.Add(1) + sl.s.wg.Add(1) go sl.run() - return sl, nil + return nil } func (sl *serverTCPListener) close() { diff --git a/server_udp_listener.go b/server_udp_listener.go index 6c6082d4..cbc968bc 100644 --- a/server_udp_listener.go +++ b/server_udp_listener.go @@ -25,39 +25,31 @@ func (p *clientAddr) fill(ip net.IP, port int) { } } -type serverUDPListener struct { - pc net.PacketConn - listenIP net.IP - writeTimeout time.Duration - clientsMutex sync.RWMutex - clients map[clientAddr]readFunc - - done chan struct{} -} - -func newServerUDPListenerMulticastPair( +func serverAllocateUDPListenerMulticastPair( listenPacket func(network, address string) (net.PacketConn, error), writeTimeout time.Duration, multicastRTPPort int, multicastRTCPPort int, ip net.IP, ) (*serverUDPListener, *serverUDPListener, error) { - rtpl, err := newServerUDPListener( - listenPacket, - writeTimeout, - true, - net.JoinHostPort(ip.String(), strconv.FormatInt(int64(multicastRTPPort), 10)), - ) + rtpl := &serverUDPListener{ + listenPacket: listenPacket, + writeTimeout: writeTimeout, + multicastEnable: true, + address: net.JoinHostPort(ip.String(), strconv.FormatInt(int64(multicastRTPPort), 10)), + } + err := rtpl.initialize() if err != nil { return nil, nil, err } - rtcpl, err := newServerUDPListener( - listenPacket, - writeTimeout, - true, - net.JoinHostPort(ip.String(), strconv.FormatInt(int64(multicastRTCPPort), 10)), - ) + rtcpl := &serverUDPListener{ + listenPacket: listenPacket, + writeTimeout: writeTimeout, + multicastEnable: true, + address: net.JoinHostPort(ip.String(), strconv.FormatInt(int64(multicastRTCPPort), 10)), + } + err = rtcpl.initialize() if err != nil { rtpl.close() return nil, nil, err @@ -66,52 +58,54 @@ func newServerUDPListenerMulticastPair( return rtpl, rtcpl, nil } -func newServerUDPListener( - listenPacket func(network, address string) (net.PacketConn, error), - writeTimeout time.Duration, - multicastEnable bool, - address string, -) (*serverUDPListener, error) { - var pc packetConn - var listenIP net.IP - if multicastEnable { +type serverUDPListener struct { + listenPacket func(network, address string) (net.PacketConn, error) + writeTimeout time.Duration + multicastEnable bool + address string + + pc packetConn + listenIP net.IP + clientsMutex sync.RWMutex + clients map[clientAddr]readFunc + + done chan struct{} +} + +func (u *serverUDPListener) initialize() error { + if u.multicastEnable { var err error - pc, err = multicast.NewMultiConn(address, false, listenPacket) + u.pc, err = multicast.NewMultiConn(u.address, false, u.listenPacket) if err != nil { - return nil, err + return err } - host, _, err := net.SplitHostPort(address) + host, _, err := net.SplitHostPort(u.address) if err != nil { - return nil, err + return err } - listenIP = net.ParseIP(host) + u.listenIP = net.ParseIP(host) } else { - tmp, err := listenPacket(restrictNetwork("udp", address)) + tmp, err := u.listenPacket(restrictNetwork("udp", u.address)) if err != nil { - return nil, err + return err } - pc = tmp.(*net.UDPConn) - listenIP = tmp.LocalAddr().(*net.UDPAddr).IP + u.pc = tmp.(*net.UDPConn) + u.listenIP = tmp.LocalAddr().(*net.UDPAddr).IP } - err := pc.SetReadBuffer(udpKernelReadBufferSize) + err := u.pc.SetReadBuffer(udpKernelReadBufferSize) if err != nil { - pc.Close() - return nil, err + u.pc.Close() + return err } - u := &serverUDPListener{ - pc: pc, - listenIP: listenIP, - clients: make(map[clientAddr]readFunc), - writeTimeout: writeTimeout, - done: make(chan struct{}), - } + u.clients = make(map[clientAddr]readFunc) + u.done = make(chan struct{}) go u.run() - return u, nil + return nil } func (u *serverUDPListener) close() {