handle deadlines; change ConnClient.WriteRequest()

This commit is contained in:
aler9
2020-05-03 15:15:44 +02:00
parent fc0216b54c
commit a171a9e328
2 changed files with 46 additions and 28 deletions

View File

@@ -4,28 +4,33 @@ import (
"bufio" "bufio"
"net" "net"
"strconv" "strconv"
"time"
) )
// ConnClient is a client-side RTSP connection. // ConnClient is a client-side RTSP connection.
type ConnClient struct { type ConnClient struct {
nconn net.Conn nconn net.Conn
br *bufio.Reader br *bufio.Reader
bw *bufio.Writer bw *bufio.Writer
session string readTimeout time.Duration
curCseq int writeTimeout time.Duration
authProv *AuthClient session string
curCSeq int
authProv *AuthClient
} }
// NewConnClient allocates a ConnClient. // NewConnClient allocates a ConnClient.
func NewConnClient(nconn net.Conn) *ConnClient { func NewConnClient(nconn net.Conn, readTimeout time.Duration, writeTimeout time.Duration) *ConnClient {
return &ConnClient{ return &ConnClient{
nconn: nconn, nconn: nconn,
br: bufio.NewReaderSize(nconn, 4096), br: bufio.NewReaderSize(nconn, 4096),
bw: bufio.NewWriterSize(nconn, 4096), bw: bufio.NewWriterSize(nconn, 4096),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
} }
} }
// NetConn returns the underlying new.Conn. // NetConn returns the underlying net.Conn.
func (c *ConnClient) NetConn() net.Conn { func (c *ConnClient) NetConn() net.Conn {
return c.nconn return c.nconn
} }
@@ -43,8 +48,8 @@ func (c *ConnClient) SetCredentials(wwwAuthenticateHeader []string, user string,
return err return err
} }
// WriteRequest writes a Request. // WriteRequest writes a request and reads a response.
func (c *ConnClient) WriteRequest(req *Request) error { func (c *ConnClient) WriteRequest(req *Request) (*Response, error) {
if c.session != "" { if c.session != "" {
if req.Header == nil { if req.Header == nil {
req.Header = make(Header) req.Header = make(Header)
@@ -59,27 +64,31 @@ func (c *ConnClient) WriteRequest(req *Request) error {
req.Header["Authorization"] = c.authProv.GenerateHeader(req.Method, req.Url) req.Header["Authorization"] = c.authProv.GenerateHeader(req.Method, req.Url)
} }
// automatically insert cseq into every outgoing request // automatically insert CSeq
if req.Header == nil { if req.Header == nil {
req.Header = make(Header) req.Header = make(Header)
} }
c.curCseq += 1 c.curCSeq += 1
req.Header["CSeq"] = []string{strconv.FormatInt(int64(c.curCseq), 10)} req.Header["CSeq"] = []string{strconv.FormatInt(int64(c.curCSeq), 10)}
return req.write(c.bw) c.nconn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
} err := req.write(c.bw)
if err != nil {
return nil, err
}
// ReadResponse reads a response. c.nconn.SetReadDeadline(time.Now().Add(c.readTimeout))
func (c *ConnClient) ReadResponse() (*Response, error) {
return readResponse(c.br) return readResponse(c.br)
} }
// ReadInterleavedFrame reads an InterleavedFrame. // ReadInterleavedFrame reads an InterleavedFrame.
func (c *ConnClient) ReadInterleavedFrame() (*InterleavedFrame, error) { func (c *ConnClient) ReadInterleavedFrame() (*InterleavedFrame, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.readTimeout))
return readInterleavedFrame(c.br) return readInterleavedFrame(c.br)
} }
// WriteInterleavedFrame writes an InterleavedFrame. // WriteInterleavedFrame writes an InterleavedFrame.
func (c *ConnClient) WriteInterleavedFrame(frame *InterleavedFrame) error { func (c *ConnClient) WriteInterleavedFrame(frame *InterleavedFrame) error {
c.nconn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
return frame.write(c.bw) return frame.write(c.bw)
} }

View File

@@ -3,45 +3,54 @@ package gortsplib
import ( import (
"bufio" "bufio"
"net" "net"
"time"
) )
// ConnServer is a server-side RTSP connection. // ConnServer is a server-side RTSP connection.
type ConnServer struct { type ConnServer struct {
nconn net.Conn nconn net.Conn
br *bufio.Reader br *bufio.Reader
bw *bufio.Writer bw *bufio.Writer
readTimeout time.Duration
writeTimeout time.Duration
} }
// NewConnServer allocates a ConnClient. // NewConnServer allocates a ConnClient.
func NewConnServer(nconn net.Conn) *ConnServer { func NewConnServer(nconn net.Conn, readTimeout time.Duration, writeTimeout time.Duration) *ConnServer {
return &ConnServer{ return &ConnServer{
nconn: nconn, nconn: nconn,
br: bufio.NewReaderSize(nconn, 4096), br: bufio.NewReaderSize(nconn, 4096),
bw: bufio.NewWriterSize(nconn, 4096), bw: bufio.NewWriterSize(nconn, 4096),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
} }
} }
// NetConn returns the underlying new.Conn. // NetConn returns the underlying net.Conn.
func (s *ConnServer) NetConn() net.Conn { func (s *ConnServer) NetConn() net.Conn {
return s.nconn return s.nconn
} }
// ReadRequest reads a Request. // ReadRequest reads a Request.
func (s *ConnServer) ReadRequest() (*Request, error) { func (s *ConnServer) ReadRequest() (*Request, error) {
s.nconn.SetReadDeadline(time.Time{}) // disable deadline
return readRequest(s.br) return readRequest(s.br)
} }
// WriteResponse writes a response. // WriteResponse writes a response.
func (s *ConnServer) WriteResponse(res *Response) error { func (s *ConnServer) WriteResponse(res *Response) error {
s.nconn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
return res.write(s.bw) return res.write(s.bw)
} }
// ReadInterleavedFrame reads an InterleavedFrame. // ReadInterleavedFrame reads an InterleavedFrame.
func (s *ConnServer) ReadInterleavedFrame() (*InterleavedFrame, error) { func (s *ConnServer) ReadInterleavedFrame() (*InterleavedFrame, error) {
s.nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
return readInterleavedFrame(s.br) return readInterleavedFrame(s.br)
} }
// WriteInterleavedFrame writes an InterleavedFrame. // WriteInterleavedFrame writes an InterleavedFrame.
func (s *ConnServer) WriteInterleavedFrame(frame *InterleavedFrame) error { func (s *ConnServer) WriteInterleavedFrame(frame *InterleavedFrame) error {
s.nconn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
return frame.write(s.bw) return frame.write(s.bw)
} }