// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT //go:build !js // +build !js package webrtc import ( "fmt" "io" "sync" "time" "github.com/pion/interceptor" "github.com/pion/rtp" ) type peekedPacket struct { payload []byte attributes interceptor.Attributes } // TrackRemote represents a single inbound source of media. type TrackRemote struct { mu sync.RWMutex id string streamID string payloadType PayloadType kind RTPCodecType ssrc SSRC rtxSsrc SSRC codec RTPCodecParameters params RTPParameters rid string receiver *RTPReceiver peekedPackets []*peekedPacket audioPlayoutStatsProviders []AudioPlayoutStatsProvider } func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote { return &TrackRemote{ kind: kind, ssrc: ssrc, rtxSsrc: rtxSsrc, rid: rid, receiver: receiver, } } // ID is the unique identifier for this Track. This should be unique for the // stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' // and StreamID would be 'desktop' or 'webcam'. func (t *TrackRemote) ID() string { t.mu.RLock() defer t.mu.RUnlock() return t.id } // RID gets the RTP Stream ID of this Track // With Simulcast you will have multiple tracks with the same ID, but different RID values. // In many cases a TrackRemote will not have an RID, so it is important to assert it is non-zero. func (t *TrackRemote) RID() string { t.mu.RLock() defer t.mu.RUnlock() return t.rid } // PayloadType gets the PayloadType of the track. func (t *TrackRemote) PayloadType() PayloadType { t.mu.RLock() defer t.mu.RUnlock() return t.payloadType } // Kind gets the Kind of the track. func (t *TrackRemote) Kind() RTPCodecType { t.mu.RLock() defer t.mu.RUnlock() return t.kind } // StreamID is the group this track belongs too. This must be unique. func (t *TrackRemote) StreamID() string { t.mu.RLock() defer t.mu.RUnlock() return t.streamID } // SSRC gets the SSRC of the track. func (t *TrackRemote) SSRC() SSRC { t.mu.RLock() defer t.mu.RUnlock() return t.ssrc } // Msid gets the Msid of the track. func (t *TrackRemote) Msid() string { return t.StreamID() + " " + t.ID() } // Codec gets the Codec of the track. func (t *TrackRemote) Codec() RTPCodecParameters { t.mu.RLock() defer t.mu.RUnlock() return t.codec } // Read reads data from the track. func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, err error) { t.mu.RLock() receiver := t.receiver var peekedPkt *peekedPacket if len(t.peekedPackets) != 0 { peekedPkt = t.peekedPackets[0] t.peekedPackets = t.peekedPackets[1:] } t.mu.RUnlock() if receiver.haveClosed() { return 0, nil, io.EOF } if peekedPkt != nil { n = copy(b, peekedPkt.payload) err = t.checkAndUpdateTrack(b) return n, peekedPkt.attributes, err } // If there's a separate RTX track and an RTX packet is available, return that if rtxPacketReceived := receiver.readRTX(t); rtxPacketReceived != nil { n = copy(b, rtxPacketReceived.pkt) attributes = rtxPacketReceived.attributes rtxPacketReceived.release() return n, attributes, nil } n, attributes, err = receiver.readRTP(b, t) if err != nil { return n, attributes, err } err = t.checkAndUpdateTrack(b) return n, attributes, err } // checkAndUpdateTrack checks payloadType for every incoming packet // once a different payloadType is detected the track will be updated. func (t *TrackRemote) checkAndUpdateTrack(b []byte) error { if len(b) < 2 { return errRTPTooShort } payloadType := PayloadType(b[1] & rtpPayloadTypeBitmask) if payloadType != t.PayloadType() || len(t.params.Codecs) == 0 { t.mu.Lock() defer t.mu.Unlock() params, err := t.receiver.api.mediaEngine.getRTPParametersByPayloadType(payloadType) if err != nil { return err } t.kind = t.receiver.kind t.payloadType = payloadType t.codec = params.Codecs[0] t.params = params } return nil } // ReadRTP is a convenience method that wraps Read and unmarshals for you. func (t *TrackRemote) ReadRTP() (*rtp.Packet, interceptor.Attributes, error) { b := make([]byte, t.receiver.api.settingEngine.getReceiveMTU()) i, attributes, err := t.Read(b) if err != nil { return nil, nil, err } r := &rtp.Packet{} if err := r.Unmarshal(b[:i]); err != nil { return nil, nil, err } return r, attributes, nil } // peek is like Read, but it doesn't discard the packet read. func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error) { n, a, err = t.Read(b) if err != nil { return } t.mu.Lock() // this might overwrite data if somebody peeked between the Read // and us getting the lock. Oh well, we'll just drop a packet in // that case. data := make([]byte, n) n = copy(data, b[:n]) t.peekedPackets = append(t.peekedPackets, &peekedPacket{payload: data, attributes: a}) t.mu.Unlock() return } // SetReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever. func (t *TrackRemote) SetReadDeadline(deadline time.Time) error { return t.receiver.setRTPReadDeadline(deadline, t) } // RtxSSRC returns the RTX SSRC for a track, or 0 if track does not have a separate RTX stream. func (t *TrackRemote) RtxSSRC() SSRC { t.mu.RLock() defer t.mu.RUnlock() return t.rtxSsrc } // HasRTX returns true if the track has a separate RTX stream. func (t *TrackRemote) HasRTX() bool { t.mu.RLock() defer t.mu.RUnlock() return t.rtxSsrc != 0 } func (t *TrackRemote) addProvider(provider AudioPlayoutStatsProvider) { t.mu.Lock() defer t.mu.Unlock() for _, p := range t.audioPlayoutStatsProviders { if p == provider { return } } t.audioPlayoutStatsProviders = append(t.audioPlayoutStatsProviders, provider) } func (t *TrackRemote) removeProvider(provider AudioPlayoutStatsProvider) { t.mu.Lock() defer t.mu.Unlock() for i, p := range t.audioPlayoutStatsProviders { if p == provider { t.audioPlayoutStatsProviders = append(t.audioPlayoutStatsProviders[:i], t.audioPlayoutStatsProviders[i+1:]...) return } } } func (t *TrackRemote) pullAudioPlayoutStats(now time.Time) []AudioPlayoutStats { t.mu.RLock() providers := t.audioPlayoutStatsProviders t.mu.RUnlock() if len(providers) == 0 { return nil } var allStats []AudioPlayoutStats for _, provider := range providers { stats, ok := provider.Snapshot(now) if !ok { continue } if stats.ID == "" { stats.ID = fmt.Sprintf("media-playout-%d", uint32(t.SSRC())) } if stats.Type == "" { stats.Type = StatsTypeMediaPlayout } if stats.Kind == "" { stats.Kind = string(MediaKindAudio) } if stats.Timestamp == 0 { stats.Timestamp = statsTimestampFrom(now) } allStats = append(allStats, stats) } return allStats } func (t *TrackRemote) setRtxSSRC(ssrc SSRC) { t.mu.Lock() defer t.mu.Unlock() t.rtxSsrc = ssrc }