mirror of
https://github.com/aler9/gortsplib
synced 2025-10-08 08:30:06 +08:00
convert Tracks into Medias and Formats (#155)
* split tracks from medias * move tracks into dedicated package * move media into dedicated package * edit Medias.Marshal() in order to return SDP * add medias.Find() and simplify examples * improve coverage * fix rebase errors * replace TrackIDs with MediaIDs * implement media-specific and track-specific callbacks for reading RTCP and RTP packets * rename publish into record, read into play * add v2 tag * rename tracks into formats
This commit is contained in:
236
serverstream.go
236
serverstream.go
@@ -8,55 +8,73 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/aler9/gortsplib/pkg/liberrors"
|
||||
"github.com/aler9/gortsplib/pkg/rtcpsender"
|
||||
"github.com/aler9/gortsplib/v2/pkg/headers"
|
||||
"github.com/aler9/gortsplib/v2/pkg/liberrors"
|
||||
"github.com/aler9/gortsplib/v2/pkg/media"
|
||||
"github.com/aler9/gortsplib/v2/pkg/rtcpsender"
|
||||
)
|
||||
|
||||
type serverStreamTrack struct {
|
||||
lastSequenceNumber uint16
|
||||
lastSSRC uint32
|
||||
lastTimeFilled bool
|
||||
lastTimeRTP uint32
|
||||
lastTimeNTP time.Time
|
||||
rtcpSender *rtcpsender.RTCPSender
|
||||
multicastHandler *serverMulticastHandler
|
||||
}
|
||||
|
||||
// ServerStream represents a data stream.
|
||||
// This is in charge of
|
||||
// - distributing the stream to each reader
|
||||
// - allocating multicast listeners
|
||||
// - gathering infos about the stream in order to generate SSRC and RTP-Info
|
||||
type ServerStream struct {
|
||||
tracks Tracks
|
||||
medias media.Medias
|
||||
|
||||
mutex sync.RWMutex
|
||||
s *Server
|
||||
activeUnicastReaders map[*ServerSession]struct{}
|
||||
readers map[*ServerSession]struct{}
|
||||
streamTracks []*serverStreamTrack
|
||||
streamMedias map[*media.Media]*serverStreamMedia
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewServerStream allocates a ServerStream.
|
||||
func NewServerStream(tracks Tracks) *ServerStream {
|
||||
tracks = tracks.clone()
|
||||
tracks.setControls()
|
||||
|
||||
func NewServerStream(medias media.Medias) *ServerStream {
|
||||
st := &ServerStream{
|
||||
tracks: tracks,
|
||||
medias: medias,
|
||||
activeUnicastReaders: make(map[*ServerSession]struct{}),
|
||||
readers: make(map[*ServerSession]struct{}),
|
||||
}
|
||||
|
||||
st.streamTracks = make([]*serverStreamTrack, len(tracks))
|
||||
for i := range st.streamTracks {
|
||||
st.streamTracks[i] = &serverStreamTrack{}
|
||||
st.streamMedias = make(map[*media.Media]*serverStreamMedia, len(medias))
|
||||
for _, media := range medias {
|
||||
ssm := &serverStreamMedia{}
|
||||
|
||||
ssm.formats = make(map[uint8]*serverStreamFormat)
|
||||
for _, trak := range media.Formats {
|
||||
tr := &serverStreamFormat{
|
||||
format: trak,
|
||||
}
|
||||
|
||||
cmedia := media
|
||||
tr.rtcpSender = rtcpsender.New(
|
||||
trak.ClockRate(),
|
||||
func(pkt rtcp.Packet) {
|
||||
st.WritePacketRTCP(cmedia, pkt)
|
||||
},
|
||||
)
|
||||
|
||||
ssm.formats[trak.PayloadType()] = tr
|
||||
}
|
||||
|
||||
st.streamMedias[media] = ssm
|
||||
}
|
||||
|
||||
return st
|
||||
}
|
||||
|
||||
func (st *ServerStream) initializeServerDependentPart() {
|
||||
if !st.s.DisableRTCPSenderReports {
|
||||
for _, ssm := range st.streamMedias {
|
||||
for _, tr := range ssm.formats {
|
||||
tr.rtcpSender.Start(st.s.senderReportPeriod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes a ServerStream.
|
||||
func (st *ServerStream) Close() error {
|
||||
st.mutex.Lock()
|
||||
@@ -67,55 +85,85 @@ func (st *ServerStream) Close() error {
|
||||
ss.Close()
|
||||
}
|
||||
|
||||
for _, track := range st.streamTracks {
|
||||
if track.rtcpSender != nil {
|
||||
track.rtcpSender.Close()
|
||||
}
|
||||
if track.multicastHandler != nil {
|
||||
track.multicastHandler.close()
|
||||
}
|
||||
for _, sm := range st.streamMedias {
|
||||
sm.close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Tracks returns the tracks of the stream.
|
||||
func (st *ServerStream) Tracks() Tracks {
|
||||
return st.tracks
|
||||
// Medias returns the medias of the stream.
|
||||
func (st *ServerStream) Medias() media.Medias {
|
||||
return st.medias
|
||||
}
|
||||
|
||||
func (st *ServerStream) ssrc(trackID int) uint32 {
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
return st.streamTracks[trackID].lastSSRC
|
||||
}
|
||||
|
||||
func (st *ServerStream) rtpInfo(trackID int, now time.Time) (uint16, uint32, bool) {
|
||||
func (st *ServerStream) lastSSRC(medi *media.Media) (uint32, bool) {
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
track := st.streamTracks[trackID]
|
||||
sm := st.streamMedias[medi]
|
||||
|
||||
if !track.lastTimeFilled {
|
||||
return 0, 0, false
|
||||
// since lastSSRC() is used to fill SSRC inside the Transport header,
|
||||
// if there are multiple formats inside a single media stream,
|
||||
// do not return anything, since Transport headers don't support multiple SSRCs.
|
||||
if len(sm.formats) > 1 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
clockRate := st.tracks[trackID].ClockRate()
|
||||
var firstKey uint8
|
||||
for key := range sm.formats {
|
||||
firstKey = key
|
||||
break
|
||||
}
|
||||
|
||||
return sm.formats[firstKey].rtcpSender.LastSSRC()
|
||||
}
|
||||
|
||||
func (st *ServerStream) rtpInfoEntry(medi *media.Media, now time.Time) *headers.RTPInfoEntry {
|
||||
st.mutex.Lock()
|
||||
defer st.mutex.Unlock()
|
||||
|
||||
sm := st.streamMedias[medi]
|
||||
|
||||
// if there are multiple formats inside a single media stream,
|
||||
// do not generate a RTP-Info entry, since RTP-Info doesn't support
|
||||
// multiple sequence numbers / timestamps.
|
||||
if len(sm.formats) > 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var firstKey uint8
|
||||
for key := range sm.formats {
|
||||
firstKey = key
|
||||
break
|
||||
}
|
||||
|
||||
format := sm.formats[firstKey]
|
||||
|
||||
lastSeqNum, lastTimeRTP, lastTimeNTP, ok := format.rtcpSender.LastPacketData()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
clockRate := format.format.ClockRate()
|
||||
if clockRate == 0 {
|
||||
return 0, 0, false
|
||||
return nil
|
||||
}
|
||||
|
||||
// sequence number of the first packet of the stream
|
||||
seq := track.lastSequenceNumber + 1
|
||||
seqNum := lastSeqNum + 1
|
||||
|
||||
// RTP timestamp corresponding to the time value in
|
||||
// the Range response header.
|
||||
// remove a small quantity in order to avoid DTS > PTS
|
||||
ts := uint32(uint64(track.lastTimeRTP) +
|
||||
uint64(now.Sub(track.lastTimeNTP).Seconds()*float64(clockRate)) -
|
||||
ts := uint32(uint64(lastTimeRTP) +
|
||||
uint64(now.Sub(lastTimeNTP).Seconds()*float64(clockRate)) -
|
||||
uint64(clockRate)/10)
|
||||
|
||||
return seq, ts, true
|
||||
return &headers.RTPInfoEntry{
|
||||
SequenceNumber: &seqNum,
|
||||
Timestamp: &ts,
|
||||
}
|
||||
}
|
||||
|
||||
func (st *ServerStream) readerAdd(
|
||||
@@ -132,19 +180,7 @@ func (st *ServerStream) readerAdd(
|
||||
|
||||
if st.s == nil {
|
||||
st.s = ss.s
|
||||
|
||||
if !st.s.DisableRTCPSenderReports {
|
||||
for trackID, track := range st.streamTracks {
|
||||
cTrackID := trackID
|
||||
track.rtcpSender = rtcpsender.New(
|
||||
st.s.udpSenderReportPeriod,
|
||||
st.tracks[trackID].ClockRate(),
|
||||
func(pkt rtcp.Packet) {
|
||||
st.WritePacketRTCP(cTrackID, pkt)
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
st.initializeServerDependentPart()
|
||||
}
|
||||
|
||||
switch transport {
|
||||
@@ -154,7 +190,7 @@ func (st *ServerStream) readerAdd(
|
||||
if *r.setuppedTransport == TransportUDP &&
|
||||
r.author.ip().Equal(ss.author.ip()) &&
|
||||
r.author.zone() == ss.author.zone() {
|
||||
for _, rt := range r.setuppedTracks {
|
||||
for _, rt := range r.setuppedMedias {
|
||||
if rt.udpRTPReadPort == clientPorts[0] {
|
||||
return liberrors.ErrServerUDPPortsAlreadyInUse{Port: rt.udpRTPReadPort}
|
||||
}
|
||||
@@ -164,13 +200,10 @@ func (st *ServerStream) readerAdd(
|
||||
|
||||
case TransportUDPMulticast:
|
||||
// allocate multicast listeners
|
||||
for _, track := range st.streamTracks {
|
||||
if track.multicastHandler == nil {
|
||||
mh, err := newServerMulticastHandler(st.s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
track.multicastHandler = mh
|
||||
for _, media := range st.streamMedias {
|
||||
err := media.allocateMulticastHandler(st.s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -191,10 +224,10 @@ func (st *ServerStream) readerRemove(ss *ServerSession) {
|
||||
delete(st.readers, ss)
|
||||
|
||||
if len(st.readers) == 0 {
|
||||
for _, track := range st.streamTracks {
|
||||
if track.multicastHandler != nil {
|
||||
track.multicastHandler.close()
|
||||
track.multicastHandler = nil
|
||||
for _, media := range st.streamMedias {
|
||||
if media.multicastHandler != nil {
|
||||
media.multicastHandler.close()
|
||||
media.multicastHandler = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,9 +242,10 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) {
|
||||
}
|
||||
|
||||
if *ss.setuppedTransport == TransportUDPMulticast {
|
||||
for trackID, track := range ss.setuppedTracks {
|
||||
st.streamTracks[trackID].multicastHandler.rtcpl.addClient(
|
||||
ss.author.ip(), st.streamTracks[trackID].multicastHandler.rtcpl.port(), ss, track, false)
|
||||
for mediaID, sm := range ss.setuppedMedias {
|
||||
streamMedia := st.streamMedias[mediaID]
|
||||
streamMedia.multicastHandler.rtcpl.addClient(
|
||||
ss.author.ip(), streamMedia.multicastHandler.rtcpl.port(), sm)
|
||||
}
|
||||
} else {
|
||||
st.activeUnicastReaders[ss] = struct{}{}
|
||||
@@ -227,8 +261,9 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
|
||||
}
|
||||
|
||||
if *ss.setuppedTransport == TransportUDPMulticast {
|
||||
for trackID := range ss.setuppedTracks {
|
||||
st.streamTracks[trackID].multicastHandler.rtcpl.removeClient(ss)
|
||||
for mediaID, sm := range ss.setuppedMedias {
|
||||
streamMedia := st.streamMedias[mediaID]
|
||||
streamMedia.multicastHandler.rtcpl.removeClient(sm)
|
||||
}
|
||||
} else {
|
||||
delete(st.activeUnicastReaders, ss)
|
||||
@@ -236,14 +271,14 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
|
||||
}
|
||||
|
||||
// WritePacketRTP writes a RTP packet to all the readers of the stream.
|
||||
func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) {
|
||||
st.WritePacketRTPWithNTP(trackID, pkt, time.Now())
|
||||
func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) {
|
||||
st.WritePacketRTPWithNTP(medi, pkt, time.Now())
|
||||
}
|
||||
|
||||
// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
|
||||
// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports
|
||||
// that allows the receiver to reconstruct the absolute time of the packet.
|
||||
func (st *ServerStream) WritePacketRTPWithNTP(trackID int, pkt *rtp.Packet, ntp time.Time) {
|
||||
func (st *ServerStream) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) {
|
||||
byts := make([]byte, maxPacketSize)
|
||||
n, err := pkt.MarshalTo(byts)
|
||||
if err != nil {
|
||||
@@ -258,35 +293,28 @@ func (st *ServerStream) WritePacketRTPWithNTP(trackID int, pkt *rtp.Packet, ntp
|
||||
return
|
||||
}
|
||||
|
||||
track := st.streamTracks[trackID]
|
||||
ptsEqualsDTS := ptsEqualsDTS(st.tracks[trackID], pkt)
|
||||
sm := st.streamMedias[medi]
|
||||
|
||||
if ptsEqualsDTS {
|
||||
track.lastTimeFilled = true
|
||||
track.lastTimeRTP = pkt.Header.Timestamp
|
||||
track.lastTimeNTP = ntp
|
||||
}
|
||||
trak := sm.formats[pkt.PayloadType]
|
||||
|
||||
track.lastSequenceNumber = pkt.Header.SequenceNumber
|
||||
track.lastSSRC = pkt.Header.SSRC
|
||||
|
||||
if track.rtcpSender != nil {
|
||||
track.rtcpSender.ProcessPacketRTP(ntp, pkt, ptsEqualsDTS)
|
||||
}
|
||||
trak.rtcpSender.ProcessPacket(pkt, ntp, trak.format.PTSEqualsDTS(pkt))
|
||||
|
||||
// send unicast
|
||||
for r := range st.activeUnicastReaders {
|
||||
r.writePacketRTP(trackID, byts)
|
||||
sm, ok := r.setuppedMedias[medi]
|
||||
if ok {
|
||||
sm.writePacketRTP(byts)
|
||||
}
|
||||
}
|
||||
|
||||
// send multicast
|
||||
if track.multicastHandler != nil {
|
||||
track.multicastHandler.writePacketRTP(byts)
|
||||
if sm.multicastHandler != nil {
|
||||
sm.multicastHandler.writePacketRTP(byts)
|
||||
}
|
||||
}
|
||||
|
||||
// WritePacketRTCP writes a RTCP packet to all the readers of the stream.
|
||||
func (st *ServerStream) WritePacketRTCP(trackID int, pkt rtcp.Packet) {
|
||||
func (st *ServerStream) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) {
|
||||
byts, err := pkt.Marshal()
|
||||
if err != nil {
|
||||
return
|
||||
@@ -299,14 +327,18 @@ func (st *ServerStream) WritePacketRTCP(trackID int, pkt rtcp.Packet) {
|
||||
return
|
||||
}
|
||||
|
||||
sm := st.streamMedias[medi]
|
||||
|
||||
// send unicast
|
||||
for r := range st.activeUnicastReaders {
|
||||
r.writePacketRTCP(trackID, byts)
|
||||
sm, ok := r.setuppedMedias[medi]
|
||||
if ok {
|
||||
sm.writePacketRTCP(byts)
|
||||
}
|
||||
}
|
||||
|
||||
// send multicast
|
||||
track := st.streamTracks[trackID]
|
||||
if track.multicastHandler != nil {
|
||||
track.multicastHandler.writePacketRTCP(byts)
|
||||
if sm.multicastHandler != nil {
|
||||
sm.multicastHandler.writePacketRTCP(byts)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user