Files
go2rtc/pkg/core/track.go
2025-04-08 11:33:04 +03:00

222 lines
4.1 KiB
Go

package core
import (
"encoding/json"
"errors"
"github.com/pion/rtp"
)
var ErrCantGetTrack = errors.New("can't get track")
type Receiver struct {
Node
// Deprecated: should be removed
Media *Media `json:"-"`
// Deprecated: should be removed
ID byte `json:"-"` // Channel for RTSP, PayloadType for MPEG-TS
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
}
func NewReceiver(media *Media, codec *Codec) *Receiver {
r := &Receiver{
Node: Node{id: NewID(), Codec: codec},
Media: media,
}
r.Input = func(packet *Packet) {
r.Bytes += len(packet.Payload)
r.Packets++
for _, child := range r.childs {
child.Input(packet)
}
}
return r
}
// Deprecated: should be removed
func (r *Receiver) WriteRTP(packet *rtp.Packet) {
r.Input(packet)
}
// Deprecated: should be removed
func (r *Receiver) Senders() []*Sender {
if len(r.childs) > 0 {
return []*Sender{{}}
} else {
return nil
}
}
// Deprecated: should be removed
func (r *Receiver) Replace(target *Receiver) {
MoveNode(&target.Node, &r.Node)
}
func (r *Receiver) Close() {
r.Node.Close()
}
type Sender struct {
Node
// Deprecated:
Media *Media `json:"-"`
// Deprecated:
Handler HandlerFunc `json:"-"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
Drops int `json:"drops,omitempty"`
buf chan *Packet
done chan struct{}
}
func NewSender(media *Media, codec *Codec) *Sender {
var bufSize uint16
if GetKind(codec.Name) == KindVideo {
if codec.IsRTP() {
// in my tests 40Mbit/s 4K-video can generate up to 1500 items
// for the h264.RTPDepay => RTPPay queue
bufSize = 4096
} else {
bufSize = 64
}
} else {
bufSize = 128
}
buf := make(chan *Packet, bufSize)
s := &Sender{
Node: Node{id: NewID(), Codec: codec},
Media: media,
buf: buf,
}
s.Input = func(packet *Packet) {
s.mu.Lock()
if s.buf != nil {
// unblocked write to channel
select {
case s.buf <- packet:
s.Bytes += len(packet.Payload)
s.Packets++
default:
s.Drops++
}
} else {
s.Drops++
}
s.mu.Unlock()
}
s.Output = func(packet *Packet) {
s.Handler(packet)
}
return s
}
// Deprecated: should be removed
func (s *Sender) HandleRTP(parent *Receiver) {
s.WithParent(parent)
s.Start()
}
// Deprecated: should be removed
func (s *Sender) Bind(parent *Receiver) {
s.WithParent(parent)
}
func (s *Sender) WithParent(parent *Receiver) *Sender {
s.Node.WithParent(&parent.Node)
return s
}
func (s *Sender) Start() {
s.mu.Lock()
defer s.mu.Unlock()
if s.buf == nil || s.done != nil {
return
}
s.done = make(chan struct{})
// pass buf directly so that it's impossible for buf to be nil
go func(buf chan *Packet) {
for packet := range buf {
s.Output(packet)
}
close(s.done)
}(s.buf)
}
func (s *Sender) Wait() {
if done := s.done; done != nil {
<-done
}
}
func (s *Sender) State() string {
if s.buf == nil {
return "closed"
}
if s.done == nil {
return "new"
}
return "connected"
}
func (s *Sender) Close() {
// close buffer if exists
s.mu.Lock()
if s.buf != nil {
close(s.buf) // exit from for range loop
s.buf = nil // prevent writing to closed chan
}
s.mu.Unlock()
s.Node.Close()
}
func (r *Receiver) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Childs []uint32 `json:"childs,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
}{
ID: r.Node.id,
Codec: r.Node.Codec,
Bytes: r.Bytes,
Packets: r.Packets,
}
for _, child := range r.childs {
v.Childs = append(v.Childs, child.id)
}
return json.Marshal(v)
}
func (s *Sender) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Parent uint32 `json:"parent,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
Drops int `json:"drops,omitempty"`
}{
ID: s.Node.id,
Codec: s.Node.Codec,
Bytes: s.Bytes,
Packets: s.Packets,
Drops: s.Drops,
}
if s.parent != nil {
v.Parent = s.parent.id
}
return json.Marshal(v)
}