merge WriteFrameTCP and WriteFrameUDP

This commit is contained in:
aler9
2020-11-08 20:01:50 +01:00
parent 7c91f02459
commit 2deddcffab
4 changed files with 21 additions and 17 deletions

View File

@@ -96,6 +96,7 @@ type ConnClient struct {
response *base.Response response *base.Response
frame *base.InterleavedFrame frame *base.InterleavedFrame
tcpFrameBuffer *multibuffer.MultiBuffer tcpFrameBuffer *multibuffer.MultiBuffer
writeFrameFunc func(trackId int, streamType StreamType, content []byte) error
reportWriterTerminate chan struct{} reportWriterTerminate chan struct{}
reportWriterDone chan struct{} reportWriterDone chan struct{}
@@ -240,17 +241,14 @@ func (c *ConnClient) ReadFrameTCP() (int, StreamType, []byte, error) {
return c.frame.TrackId, c.frame.StreamType, c.frame.Content, nil return c.frame.TrackId, c.frame.StreamType, c.frame.Content, nil
} }
// WriteFrameUDP writes an UDP frame. func (c *ConnClient) writeFrameUDP(trackId int, streamType StreamType, content []byte) error {
func (c *ConnClient) WriteFrameUDP(trackId int, streamType StreamType, content []byte) error {
if streamType == StreamTypeRtp { if streamType == StreamTypeRtp {
return c.udpRtpListeners[trackId].write(content) return c.udpRtpListeners[trackId].write(content)
} }
return c.udpRtcpListeners[trackId].write(content) return c.udpRtcpListeners[trackId].write(content)
} }
// WriteFrameTCP writes an interleaved frame. func (c *ConnClient) writeFrameTCP(trackId int, streamType StreamType, content []byte) error {
// this can't be used when reading.
func (c *ConnClient) WriteFrameTCP(trackId int, streamType StreamType, content []byte) error {
frame := base.InterleavedFrame{ frame := base.InterleavedFrame{
TrackId: trackId, TrackId: trackId,
StreamType: streamType, StreamType: streamType,
@@ -261,6 +259,12 @@ func (c *ConnClient) WriteFrameTCP(trackId int, streamType StreamType, content [
return frame.Write(c.bw) return frame.Write(c.bw)
} }
// WriteFrame writes a frame.
// This can be used only after Record().
func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error {
return c.writeFrameFunc(trackId, streamType, content)
}
// Do writes a Request and reads a Response. // Do writes a Request and reads a Response.
// Interleaved frames sent before the response are ignored. // Interleaved frames sent before the response are ignored.
func (c *ConnClient) Do(req *base.Request) (*base.Response, error) { func (c *ConnClient) Do(req *base.Request) (*base.Response, error) {
@@ -418,7 +422,7 @@ func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) {
} }
} }
// build an URL by merging baseUrl with the control attribute from track.Media // build an URL by merging baseUrl with the control attribute from track.Media.
func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode, track *Track) *base.URL { func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode, track *Track) *base.URL {
control := func() string { control := func() string {
// if we're reading, get control from track ID // if we're reading, get control from track ID
@@ -696,13 +700,7 @@ func (c *ConnClient) Play() (*base.Response, error) {
case <-reportWriterTicker.C: case <-reportWriterTicker.C:
for trackId := range c.rtcpReceivers { for trackId := range c.rtcpReceivers {
frame := c.rtcpReceivers[trackId].Report() frame := c.rtcpReceivers[trackId].Report()
c.WriteFrame(trackId, StreamTypeRtcp, frame)
if *c.streamProtocol == StreamProtocolUDP {
c.udpRtcpListeners[trackId].write(frame)
} else {
c.WriteFrameTCP(trackId, StreamTypeRtcp, frame)
}
} }
} }
} }
@@ -766,6 +764,12 @@ func (c *ConnClient) Record() (*base.Response, error) {
c.state = connClientStateRecord c.state = connClientStateRecord
if *c.streamProtocol == StreamProtocolUDP {
c.writeFrameFunc = c.writeFrameUDP
} else {
c.writeFrameFunc = c.writeFrameTCP
}
return nil, nil return nil, nil
} }

View File

@@ -249,7 +249,7 @@ func TestDialPublishUDP(t *testing.T) {
break break
} }
err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
break break
} }
@@ -349,7 +349,7 @@ func TestDialPublishTCP(t *testing.T) {
break break
} }
err = conn.WriteFrameTCP(track.Id, StreamTypeRtp, buf[:n]) err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
break break
} }

View File

@@ -57,7 +57,7 @@ func main() {
} }
// write frames to the server // write frames to the server
err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) err = conn.WriteFrame(track.Id, gortsplib.StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
fmt.Println("connection is closed (%s)", err) fmt.Println("connection is closed (%s)", err)
break break

View File

@@ -69,7 +69,7 @@ func main() {
} }
// write frames to the server // write frames to the server
err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) err = conn.WriteFrame(track.Id, gortsplib.StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
break break
} }