add BytesSent to ServerStream (#457)

* add BytesSent to ServerStream (to allow MediaMTX to gather bytes sent on all paths)

* improve performance

* add test

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
This commit is contained in:
Dr. Ralf S. Engelschall
2023-11-04 17:46:22 +01:00
committed by GitHub
parent 35bf96c5ec
commit 0933bf9975
3 changed files with 88 additions and 1 deletions

View File

@@ -673,7 +673,7 @@ func TestServerPlay(t *testing.T) {
v := headers.TransportDeliveryUnicast v := headers.TransportDeliveryUnicast
inTH.Delivery = &v inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{5, 6} // off value inTH.InterleavedIDs = &[2]int{5, 6} // odd value
} }
res, th := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "") res, th := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "")
@@ -2122,3 +2122,76 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
require.Equal(t, testRTPPacketMarshaled, f.Payload) require.Equal(t, testRTPPacketMarshaled, f.Payload)
} }
} }
func TestServerPlayBytesSent(t *testing.T) {
var stream *ServerStream
s := &Server{
RTSPAddress: "localhost:8554",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8000,
MulticastRTCPPort: 8001,
Handler: &testServerHandler{
onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}})
defer stream.Close()
for _, transport := range []string{"tcp", "multicast"} {
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
inTH := &headers.Transport{
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
}
if transport == "multicast" {
v := headers.TransportDeliveryMulticast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
} else {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
res, _ := doSetup(t, conn, absoluteControlAttribute(desc.MediaDescriptions[0]), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
}
err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
require.Equal(t, uint64(16*2), stream.BytesSent())
}

View File

@@ -2,6 +2,7 @@ package gortsplib
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
@@ -37,6 +38,7 @@ type ServerStream struct {
activeUnicastReaders map[*ServerSession]struct{} activeUnicastReaders map[*ServerSession]struct{}
streamMedias map[*description.Media]*serverStreamMedia streamMedias map[*description.Media]*serverStreamMedia
closed bool closed bool
bytesSent *uint64
} }
// NewServerStream allocates a ServerStream. // NewServerStream allocates a ServerStream.
@@ -46,6 +48,7 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
desc: desc, desc: desc,
readers: make(map[*ServerSession]struct{}), readers: make(map[*ServerSession]struct{}),
activeUnicastReaders: make(map[*ServerSession]struct{}), activeUnicastReaders: make(map[*ServerSession]struct{}),
bytesSent: new(uint64),
} }
st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias))
@@ -71,6 +74,11 @@ func (st *ServerStream) Close() {
} }
} }
// BytesSent returns the number of written bytes.
func (st *ServerStream) BytesSent() uint64 {
return atomic.LoadUint64(st.bytesSent)
}
// Description returns the description of the stream. // Description returns the description of the stream.
func (st *ServerStream) Description() *description.Session { func (st *ServerStream) Description() *description.Session {
return st.desc return st.desc

View File

@@ -1,6 +1,7 @@
package gortsplib package gortsplib
import ( import (
"sync/atomic"
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
@@ -39,6 +40,8 @@ func newServerStreamFormat(sm *serverStreamMedia, forma format.Format) *serverSt
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
le := uint64(len(byts))
// send unicast // send unicast
for r := range sf.sm.st.activeUnicastReaders { for r := range sf.sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sf.sm.media] sm, ok := r.setuppedMedias[sf.sm.media]
@@ -46,6 +49,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
err := sm.writePacketRTP(byts) err := sm.writePacketRTP(byts)
if err != nil { if err != nil {
r.onStreamWriteError(err) r.onStreamWriteError(err)
} else {
atomic.AddUint64(sf.sm.st.bytesSent, le)
} }
} }
} }
@@ -56,6 +61,7 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
if err != nil { if err != nil {
return err return err
} }
atomic.AddUint64(sf.sm.st.bytesSent, le)
} }
return nil return nil