diff --git a/clientconf.go b/clientconf.go index bc6ff7e1..a2f01c7a 100644 --- a/clientconf.go +++ b/clientconf.go @@ -68,34 +68,34 @@ type ClientConf struct { } // Dial connects to a server. -func (d ClientConf) Dial(host string) (*ClientConn, error) { - if d.ReadTimeout == 0 { - d.ReadTimeout = 10 * time.Second +func (c ClientConf) Dial(host string) (*ClientConn, error) { + if c.ReadTimeout == 0 { + c.ReadTimeout = 10 * time.Second } - if d.WriteTimeout == 0 { - d.WriteTimeout = 10 * time.Second + if c.WriteTimeout == 0 { + c.WriteTimeout = 10 * time.Second } - if d.ReadBufferCount == 0 { - d.ReadBufferCount = 1 + if c.ReadBufferCount == 0 { + c.ReadBufferCount = 1 } - if d.DialTimeout == nil { - d.DialTimeout = net.DialTimeout + if c.DialTimeout == nil { + c.DialTimeout = net.DialTimeout } - if d.ListenPacket == nil { - d.ListenPacket = net.ListenPacket + if c.ListenPacket == nil { + c.ListenPacket = net.ListenPacket } if !strings.Contains(host, ":") { host += ":554" } - nconn, err := d.DialTimeout("tcp", host, d.ReadTimeout) + nconn, err := c.DialTimeout("tcp", host, c.ReadTimeout) if err != nil { return nil, err } return &ClientConn{ - d: d, + c: c, nconn: nconn, br: bufio.NewReaderSize(nconn, clientReadBufferSize), bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), @@ -103,20 +103,20 @@ func (d ClientConf) Dial(host string) (*ClientConn, error) { udpRtcpListeners: make(map[int]*clientConnUDPListener), rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), udpLastFrameTimes: make(map[int]*int64), - tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize), + tcpFrameBuffer: multibuffer.New(c.ReadBufferCount, clientTCPFrameReadBufferSize), rtcpSenders: make(map[int]*rtcpsender.RtcpSender), publishError: fmt.Errorf("not running"), }, nil } // DialRead connects to the address and starts reading all tracks. -func (d ClientConf) DialRead(address string) (*ClientConn, error) { +func (c ClientConf) DialRead(address string) (*ClientConn, error) { u, err := base.ParseURL(address) if err != nil { return nil, err } - conn, err := d.Dial(u.Host) + conn, err := c.Dial(u.Host) if err != nil { return nil, err } @@ -151,13 +151,13 @@ func (d ClientConf) DialRead(address string) (*ClientConn, error) { } // DialPublish connects to the address and starts publishing the tracks. -func (d ClientConf) DialPublish(address string, tracks Tracks) (*ClientConn, error) { +func (c ClientConf) DialPublish(address string, tracks Tracks) (*ClientConn, error) { u, err := base.ParseURL(address) if err != nil { return nil, err } - conn, err := d.Dial(u.Host) + conn, err := c.Dial(u.Host) if err != nil { return nil, err } diff --git a/clientconn.go b/clientconn.go index ea7efc89..8815e979 100644 --- a/clientconn.go +++ b/clientconn.go @@ -63,7 +63,7 @@ func (s clientConnState) String() string { // ClientConn is a client-side RTSP connection. type ClientConn struct { - d ClientConf + c ClientConf nconn net.Conn br *bufio.Reader bw *bufio.Writer @@ -146,7 +146,7 @@ func (c *ClientConn) Tracks() Tracks { } func (c *ClientConn) readFrameTCPOrResponse() (interface{}, error) { - c.nconn.SetReadDeadline(time.Now().Add(c.d.ReadTimeout)) + c.nconn.SetReadDeadline(time.Now().Add(c.c.ReadTimeout)) f := base.InterleavedFrame{ Content: c.tcpFrameBuffer.Next(), } @@ -175,7 +175,7 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) { c.cseq++ req.Header["CSeq"] = base.HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + c.nconn.SetWriteDeadline(time.Now().Add(c.c.WriteTimeout)) err := req.Write(c.bw) if err != nil { return nil, err @@ -298,7 +298,7 @@ func (c *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { if res.StatusCode != base.StatusOK { // redirect - if !c.d.RedirectDisable && + if !c.c.RedirectDisable && res.StatusCode >= base.StatusMovedPermanently && res.StatusCode <= base.StatusUseProxy && len(res.Header["Location"]) == 1 { @@ -310,7 +310,7 @@ func (c *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { return nil, nil, err } - nc, err := c.d.Dial(u.Host) + nc, err := c.c.Dial(u.Host) if err != nil { return nil, nil, err } @@ -385,8 +385,8 @@ func (c *ClientConn) Setup(mode headers.TransportMode, track *Track, } // protocol set by conf - if c.d.StreamProtocol != nil { - return *c.d.StreamProtocol + if c.c.StreamProtocol != nil { + return *c.c.StreamProtocol } // try udp @@ -493,7 +493,7 @@ func (c *ClientConn) Setup(mode headers.TransportMode, track *Track, // switch protocol automatically if res.StatusCode == base.StatusUnsupportedTransport && c.streamProtocol == nil && - c.d.StreamProtocol == nil { + c.c.StreamProtocol == nil { v := StreamProtocolTCP c.streamProtocol = &v diff --git a/clientconnpublish.go b/clientconnpublish.go index 32907591..c74cbfc7 100644 --- a/clientconnpublish.go +++ b/clientconnpublish.go @@ -164,7 +164,7 @@ func (c *ClientConn) backgroundRecordTCP() { for trackID := range c.rtcpSenders { r := c.rtcpSenders[trackID].Report(now) if r != nil { - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + c.nconn.SetWriteDeadline(time.Now().Add(c.c.WriteTimeout)) frame := base.InterleavedFrame{ TrackID: trackID, StreamType: StreamTypeRtcp, @@ -199,7 +199,7 @@ func (c *ClientConn) WriteFrame(trackID int, streamType StreamType, content []by return c.udpRtcpListeners[trackID].write(content) } - c.nconn.SetWriteDeadline(now.Add(c.d.WriteTimeout)) + c.nconn.SetWriteDeadline(now.Add(c.c.WriteTimeout)) frame := base.InterleavedFrame{ TrackID: trackID, StreamType: streamType, diff --git a/clientconnread.go b/clientconnread.go index 0503acd3..3eb44b7e 100644 --- a/clientconnread.go +++ b/clientconnread.go @@ -126,7 +126,7 @@ func (c *ClientConn) backgroundPlayUDP(onFrameDone chan error) { for _, lastUnix := range c.udpLastFrameTimes { last := time.Unix(atomic.LoadInt64(lastUnix), 0) - if now.Sub(last) >= c.d.ReadTimeout { + if now.Sub(last) >= c.c.ReadTimeout { c.nconn.SetReadDeadline(time.Now()) <-readerDone returnError = fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") @@ -180,7 +180,7 @@ func (c *ClientConn) backgroundPlayTCP(onFrameDone chan error) { for { select { case <-deadlineTicker.C: - c.nconn.SetReadDeadline(time.Now().Add(c.d.ReadTimeout)) + c.nconn.SetReadDeadline(time.Now().Add(c.c.ReadTimeout)) case <-c.backgroundTerminate: c.nconn.SetReadDeadline(time.Now()) @@ -192,7 +192,7 @@ func (c *ClientConn) backgroundPlayTCP(onFrameDone chan error) { now := time.Now() for trackID := range c.rtcpReceivers { r := c.rtcpReceivers[trackID].Report(now) - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + c.nconn.SetWriteDeadline(time.Now().Add(c.c.WriteTimeout)) frame := base.InterleavedFrame{ TrackID: trackID, StreamType: StreamTypeRtcp, diff --git a/clientconnudpl.go b/clientconnudpl.go index 7fd92d51..bbea1459 100644 --- a/clientconnudpl.go +++ b/clientconnudpl.go @@ -31,7 +31,7 @@ type clientConnUDPListener struct { } func newClientConnUDPListener(c *ClientConn, port int) (*clientConnUDPListener, error) { - pc, err := c.d.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) + pc, err := c.c.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) if err != nil { return nil, err } @@ -44,7 +44,7 @@ func newClientConnUDPListener(c *ClientConn, port int) (*clientConnUDPListener, return &clientConnUDPListener{ c: c, pc: pc, - udpFrameBuffer: multibuffer.New(c.d.ReadBufferCount, clientConnUDPReadBufferSize), + udpFrameBuffer: multibuffer.New(c.c.ReadBufferCount, clientConnUDPReadBufferSize), }, nil } @@ -92,7 +92,7 @@ func (l *clientConnUDPListener) run() { } func (l *clientConnUDPListener) write(buf []byte) error { - l.pc.SetWriteDeadline(time.Now().Add(l.c.d.WriteTimeout)) + l.pc.SetWriteDeadline(time.Now().Add(l.c.c.WriteTimeout)) _, err := l.pc.WriteTo(buf, &net.UDPAddr{ IP: l.remoteIP, Zone: l.remoteZone, diff --git a/serverconf.go b/serverconf.go index 18d54d65..3645552a 100644 --- a/serverconf.go +++ b/serverconf.go @@ -37,6 +37,19 @@ type ServerConf struct { // Serve starts a server on the given address. func (c ServerConf) Serve(address string, handler ServerHandler) (*Server, error) { + if c.ReadTimeout == 0 { + c.ReadTimeout = 10 * time.Second + } + if c.WriteTimeout == 0 { + c.WriteTimeout = 10 * time.Second + } + if c.ReadBufferCount == 0 { + c.ReadBufferCount = 1 + } + if c.ListenTCP == nil { + c.ListenTCP = net.ListenTCP + } + addr, err := net.ResolveTCPAddr("tcp", address) if err != nil { return nil, err