diff --git a/internal/hls/client.go b/internal/hls/client.go index 98e0e431..25de2cf4 100644 --- a/internal/hls/client.go +++ b/internal/hls/client.go @@ -16,10 +16,6 @@ import ( "time" "github.com/aler9/gortsplib" - "github.com/aler9/gortsplib/pkg/aac" - "github.com/aler9/gortsplib/pkg/h264" - "github.com/aler9/gortsplib/pkg/rtpaac" - "github.com/aler9/gortsplib/pkg/rtph264" "github.com/asticode/go-astits" "github.com/grafov/m3u8" "github.com/pion/rtp" @@ -51,339 +47,10 @@ func clientURLAbsolute(base *url.URL, relative string) (*url.URL, error) { return u, nil } -type clientSegmentQueue struct { - mutex sync.Mutex - queue [][]byte - didPush chan struct{} - didPull chan struct{} -} - -func newClientSegmentQueue() *clientSegmentQueue { - return &clientSegmentQueue{ - didPush: make(chan struct{}), - didPull: make(chan struct{}), - } -} - -func (q *clientSegmentQueue) push(seg []byte) { - q.mutex.Lock() - - queueWasEmpty := (len(q.queue) == 0) - q.queue = append(q.queue, seg) - - if queueWasEmpty { - close(q.didPush) - q.didPush = make(chan struct{}) - } - - q.mutex.Unlock() -} - -func (q *clientSegmentQueue) waitUntilSizeIsBelow(ctx context.Context, n int) { - q.mutex.Lock() - - for len(q.queue) > n { - q.mutex.Unlock() - - select { - case <-q.didPull: - case <-ctx.Done(): - return - } - - q.mutex.Lock() - } - - q.mutex.Unlock() -} - -func (q *clientSegmentQueue) waitAndPull(ctx context.Context) ([]byte, error) { - q.mutex.Lock() - - for len(q.queue) == 0 { - didPush := q.didPush - q.mutex.Unlock() - - select { - case <-didPush: - case <-ctx.Done(): - return nil, fmt.Errorf("terminated") - } - - q.mutex.Lock() - } - - var seg []byte - seg, q.queue = q.queue[0], q.queue[1:] - - close(q.didPull) - q.didPull = make(chan struct{}) - - q.mutex.Unlock() - return seg, nil -} - type clientAllocateProcsReq struct { res chan struct{} } -type clientVideoProcessorData struct { - data []byte - pts time.Duration - dts time.Duration -} - -type clientVideoProcessor struct { - ctx context.Context - onTrack func(gortsplib.Track) error - onPacket func(*rtp.Packet) - - queue chan clientVideoProcessorData - sps []byte - pps []byte - encoder *rtph264.Encoder - clockStartRTC time.Time -} - -func newClientVideoProcessor( - ctx context.Context, - onTrack func(gortsplib.Track) error, - onPacket func(*rtp.Packet), -) *clientVideoProcessor { - p := &clientVideoProcessor{ - ctx: ctx, - onTrack: onTrack, - onPacket: onPacket, - queue: make(chan clientVideoProcessorData, clientQueueSize), - } - - return p -} - -func (p *clientVideoProcessor) run() error { - for { - select { - case item := <-p.queue: - err := p.doProcess(item.data, item.pts, item.dts) - if err != nil { - return err - } - - case <-p.ctx.Done(): - return nil - } - } -} - -func (p *clientVideoProcessor) doProcess( - data []byte, - pts time.Duration, - dts time.Duration) error { - elapsed := time.Since(p.clockStartRTC) - if dts > elapsed { - select { - case <-p.ctx.Done(): - return fmt.Errorf("terminated") - case <-time.After(dts - elapsed): - } - } - - nalus, err := h264.DecodeAnnexB(data) - if err != nil { - return err - } - - outNALUs := make([][]byte, 0, len(nalus)) - - for _, nalu := range nalus { - typ := h264.NALUType(nalu[0] & 0x1F) - - switch typ { - case h264.NALUTypeSPS: - if p.sps == nil { - p.sps = append([]byte(nil), nalu...) - - if p.encoder == nil && p.pps != nil { - err := p.initializeEncoder() - if err != nil { - return err - } - } - } - - // remove since it's not needed - continue - - case h264.NALUTypePPS: - if p.pps == nil { - p.pps = append([]byte(nil), nalu...) - - if p.encoder == nil && p.sps != nil { - err := p.initializeEncoder() - if err != nil { - return err - } - } - } - - // remove since it's not needed - continue - - case h264.NALUTypeAccessUnitDelimiter: - // remove since it's not needed - continue - } - - outNALUs = append(outNALUs, nalu) - } - - if len(outNALUs) == 0 { - return nil - } - - if p.encoder == nil { - return nil - } - - pkts, err := p.encoder.Encode(outNALUs, pts) - if err != nil { - return fmt.Errorf("error while encoding H264: %v", err) - } - - for _, pkt := range pkts { - p.onPacket(pkt) - } - - return nil -} - -func (p *clientVideoProcessor) process( - data []byte, - pts time.Duration, - dts time.Duration) { - p.queue <- clientVideoProcessorData{data, pts, dts} -} - -func (p *clientVideoProcessor) initializeEncoder() error { - track, err := gortsplib.NewTrackH264(96, p.sps, p.pps, nil) - if err != nil { - return err - } - - p.encoder = rtph264.NewEncoder(96, nil, nil, nil) - - return p.onTrack(track) -} - -type clientAudioProcessorData struct { - data []byte - pts time.Duration -} - -type clientAudioProcessor struct { - ctx context.Context - onTrack func(gortsplib.Track) error - onPacket func(*rtp.Packet) - - queue chan clientAudioProcessorData - encoder *rtpaac.Encoder - clockStartRTC time.Time -} - -func newClientAudioProcessor( - ctx context.Context, - onTrack func(gortsplib.Track) error, - onPacket func(*rtp.Packet), -) *clientAudioProcessor { - p := &clientAudioProcessor{ - ctx: ctx, - onTrack: onTrack, - onPacket: onPacket, - queue: make(chan clientAudioProcessorData, clientQueueSize), - } - - return p -} - -func (p *clientAudioProcessor) run() error { - for { - select { - case item := <-p.queue: - err := p.doProcess(item.data, item.pts) - if err != nil { - return err - } - - case <-p.ctx.Done(): - return nil - } - } -} - -func (p *clientAudioProcessor) doProcess( - data []byte, - pts time.Duration) error { - adtsPkts, err := aac.DecodeADTS(data) - if err != nil { - return err - } - - aus := make([][]byte, 0, len(adtsPkts)) - - pktPts := pts - - now := time.Now() - - for _, pkt := range adtsPkts { - elapsed := now.Sub(p.clockStartRTC) - - if pktPts > elapsed { - select { - case <-p.ctx.Done(): - return fmt.Errorf("terminated") - case <-time.After(pktPts - elapsed): - } - } - - if p.encoder == nil { - track, err := gortsplib.NewTrackAAC(97, pkt.Type, pkt.SampleRate, pkt.ChannelCount, nil) - if err != nil { - return err - } - - p.encoder = rtpaac.NewEncoder(97, track.ClockRate(), nil, nil, nil) - - err = p.onTrack(track) - if err != nil { - return err - } - } - - aus = append(aus, pkt.AU) - pktPts += 1000 * time.Second / time.Duration(pkt.SampleRate) - } - - pkts, err := p.encoder.Encode(aus, pts) - if err != nil { - return fmt.Errorf("error while encoding AAC: %v", err) - } - - for _, pkt := range pkts { - p.onPacket(pkt) - } - - return nil -} - -func (p *clientAudioProcessor) process( - data []byte, - pts time.Duration) { - select { - case p.queue <- clientAudioProcessorData{data, pts}: - case <-p.ctx.Done(): - } -} - // ClientParent is the parent of a Client. type ClientParent interface { Log(level logger.Level, format string, args ...interface{}) diff --git a/internal/hls/client_audio_processor.go b/internal/hls/client_audio_processor.go new file mode 100644 index 00000000..28dea1f3 --- /dev/null +++ b/internal/hls/client_audio_processor.go @@ -0,0 +1,121 @@ +package hls + +import ( + "context" + "fmt" + "time" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/aac" + "github.com/aler9/gortsplib/pkg/rtpaac" + "github.com/pion/rtp" +) + +type clientAudioProcessorData struct { + data []byte + pts time.Duration +} + +type clientAudioProcessor struct { + ctx context.Context + onTrack func(gortsplib.Track) error + onPacket func(*rtp.Packet) + + queue chan clientAudioProcessorData + encoder *rtpaac.Encoder + clockStartRTC time.Time +} + +func newClientAudioProcessor( + ctx context.Context, + onTrack func(gortsplib.Track) error, + onPacket func(*rtp.Packet), +) *clientAudioProcessor { + p := &clientAudioProcessor{ + ctx: ctx, + onTrack: onTrack, + onPacket: onPacket, + queue: make(chan clientAudioProcessorData, clientQueueSize), + } + + return p +} + +func (p *clientAudioProcessor) run() error { + for { + select { + case item := <-p.queue: + err := p.doProcess(item.data, item.pts) + if err != nil { + return err + } + + case <-p.ctx.Done(): + return nil + } + } +} + +func (p *clientAudioProcessor) doProcess( + data []byte, + pts time.Duration) error { + adtsPkts, err := aac.DecodeADTS(data) + if err != nil { + return err + } + + aus := make([][]byte, 0, len(adtsPkts)) + + pktPts := pts + + now := time.Now() + + for _, pkt := range adtsPkts { + elapsed := now.Sub(p.clockStartRTC) + + if pktPts > elapsed { + select { + case <-p.ctx.Done(): + return fmt.Errorf("terminated") + case <-time.After(pktPts - elapsed): + } + } + + if p.encoder == nil { + track, err := gortsplib.NewTrackAAC(97, pkt.Type, pkt.SampleRate, pkt.ChannelCount, nil) + if err != nil { + return err + } + + p.encoder = rtpaac.NewEncoder(97, track.ClockRate(), nil, nil, nil) + + err = p.onTrack(track) + if err != nil { + return err + } + } + + aus = append(aus, pkt.AU) + pktPts += 1000 * time.Second / time.Duration(pkt.SampleRate) + } + + pkts, err := p.encoder.Encode(aus, pts) + if err != nil { + return fmt.Errorf("error while encoding AAC: %v", err) + } + + for _, pkt := range pkts { + p.onPacket(pkt) + } + + return nil +} + +func (p *clientAudioProcessor) process( + data []byte, + pts time.Duration) { + select { + case p.queue <- clientAudioProcessorData{data, pts}: + case <-p.ctx.Done(): + } +} diff --git a/internal/hls/client_segment_queue.go b/internal/hls/client_segment_queue.go new file mode 100644 index 00000000..b5a432dd --- /dev/null +++ b/internal/hls/client_segment_queue.go @@ -0,0 +1,79 @@ +package hls + +import ( + "context" + "fmt" + "sync" +) + +type clientSegmentQueue struct { + mutex sync.Mutex + queue [][]byte + didPush chan struct{} + didPull chan struct{} +} + +func newClientSegmentQueue() *clientSegmentQueue { + return &clientSegmentQueue{ + didPush: make(chan struct{}), + didPull: make(chan struct{}), + } +} + +func (q *clientSegmentQueue) push(seg []byte) { + q.mutex.Lock() + + queueWasEmpty := (len(q.queue) == 0) + q.queue = append(q.queue, seg) + + if queueWasEmpty { + close(q.didPush) + q.didPush = make(chan struct{}) + } + + q.mutex.Unlock() +} + +func (q *clientSegmentQueue) waitUntilSizeIsBelow(ctx context.Context, n int) { + q.mutex.Lock() + + for len(q.queue) > n { + q.mutex.Unlock() + + select { + case <-q.didPull: + case <-ctx.Done(): + return + } + + q.mutex.Lock() + } + + q.mutex.Unlock() +} + +func (q *clientSegmentQueue) waitAndPull(ctx context.Context) ([]byte, error) { + q.mutex.Lock() + + for len(q.queue) == 0 { + didPush := q.didPush + q.mutex.Unlock() + + select { + case <-didPush: + case <-ctx.Done(): + return nil, fmt.Errorf("terminated") + } + + q.mutex.Lock() + } + + var seg []byte + seg, q.queue = q.queue[0], q.queue[1:] + + close(q.didPull) + q.didPull = make(chan struct{}) + + q.mutex.Unlock() + return seg, nil +} diff --git a/internal/hls/client_video_processor.go b/internal/hls/client_video_processor.go new file mode 100644 index 00000000..360dc2f0 --- /dev/null +++ b/internal/hls/client_video_processor.go @@ -0,0 +1,160 @@ +package hls + +import ( + "context" + "fmt" + "time" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/h264" + "github.com/aler9/gortsplib/pkg/rtph264" + "github.com/pion/rtp" +) + +type clientVideoProcessorData struct { + data []byte + pts time.Duration + dts time.Duration +} + +type clientVideoProcessor struct { + ctx context.Context + onTrack func(gortsplib.Track) error + onPacket func(*rtp.Packet) + + queue chan clientVideoProcessorData + sps []byte + pps []byte + encoder *rtph264.Encoder + clockStartRTC time.Time +} + +func newClientVideoProcessor( + ctx context.Context, + onTrack func(gortsplib.Track) error, + onPacket func(*rtp.Packet), +) *clientVideoProcessor { + p := &clientVideoProcessor{ + ctx: ctx, + onTrack: onTrack, + onPacket: onPacket, + queue: make(chan clientVideoProcessorData, clientQueueSize), + } + + return p +} + +func (p *clientVideoProcessor) run() error { + for { + select { + case item := <-p.queue: + err := p.doProcess(item.data, item.pts, item.dts) + if err != nil { + return err + } + + case <-p.ctx.Done(): + return nil + } + } +} + +func (p *clientVideoProcessor) doProcess( + data []byte, + pts time.Duration, + dts time.Duration) error { + elapsed := time.Since(p.clockStartRTC) + if dts > elapsed { + select { + case <-p.ctx.Done(): + return fmt.Errorf("terminated") + case <-time.After(dts - elapsed): + } + } + + nalus, err := h264.DecodeAnnexB(data) + if err != nil { + return err + } + + outNALUs := make([][]byte, 0, len(nalus)) + + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + + switch typ { + case h264.NALUTypeSPS: + if p.sps == nil { + p.sps = append([]byte(nil), nalu...) + + if p.encoder == nil && p.pps != nil { + err := p.initializeEncoder() + if err != nil { + return err + } + } + } + + // remove since it's not needed + continue + + case h264.NALUTypePPS: + if p.pps == nil { + p.pps = append([]byte(nil), nalu...) + + if p.encoder == nil && p.sps != nil { + err := p.initializeEncoder() + if err != nil { + return err + } + } + } + + // remove since it's not needed + continue + + case h264.NALUTypeAccessUnitDelimiter: + // remove since it's not needed + continue + } + + outNALUs = append(outNALUs, nalu) + } + + if len(outNALUs) == 0 { + return nil + } + + if p.encoder == nil { + return nil + } + + pkts, err := p.encoder.Encode(outNALUs, pts) + if err != nil { + return fmt.Errorf("error while encoding H264: %v", err) + } + + for _, pkt := range pkts { + p.onPacket(pkt) + } + + return nil +} + +func (p *clientVideoProcessor) process( + data []byte, + pts time.Duration, + dts time.Duration) { + p.queue <- clientVideoProcessorData{data, pts, dts} +} + +func (p *clientVideoProcessor) initializeEncoder() error { + track, err := gortsplib.NewTrackH264(96, p.sps, p.pps, nil) + if err != nil { + return err + } + + p.encoder = rtph264.NewEncoder(96, nil, nil, nil) + + return p.onTrack(track) +}