mirror of
				https://github.com/lkmio/lkm.git
				synced 2025-10-31 18:42:37 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			311 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			311 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package rtsp
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"github.com/lkmio/avformat"
 | |
| 	"github.com/lkmio/avformat/avc"
 | |
| 	"github.com/lkmio/avformat/collections"
 | |
| 	"github.com/lkmio/avformat/utils"
 | |
| 	"github.com/lkmio/lkm/log"
 | |
| 	"github.com/lkmio/lkm/stream"
 | |
| 	"github.com/lkmio/rtp"
 | |
| 	"github.com/pion/sdp/v3"
 | |
| 	"net"
 | |
| 	"strconv"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	OverTcpHeaderSize = 4
 | |
| 	OverTcpMagic      = 0x24
 | |
| )
 | |
| 
 | |
| // TransStream rtsp传输流封装
 | |
| type TransStream struct {
 | |
| 	stream.BaseTransStream
 | |
| 	addr      net.IPAddr
 | |
| 	addrType  string
 | |
| 	urlFormat string
 | |
| 
 | |
| 	sdp             string
 | |
| 	RtspTracks      []*Track
 | |
| 	lastEndSeq      map[utils.AVCodecID]uint16  // 上次结束推流的rtp seq
 | |
| 	segments        []stream.TransStreamSegment // 缓存的切片
 | |
| 	packetAllocator *stream.RtpBuffer           // 分配rtp包
 | |
| }
 | |
| 
 | |
| func (t *TransStream) OverTCP(data []byte, channel int) {
 | |
| 	data[0] = OverTcpMagic
 | |
| 	data[1] = byte(channel)
 | |
| 	binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
 | |
| }
 | |
| 
 | |
| func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
 | |
| 	var ts uint32
 | |
| 	var result []*collections.ReferenceCounter[[]byte]
 | |
| 	track := t.RtspTracks[trackIndex]
 | |
| 
 | |
| 	duration := packet.GetDuration(track.payload.ClockRate)
 | |
| 	//dts := t.Tracks[trackIndex].Dts
 | |
| 	ts = uint32(t.Tracks[trackIndex].Pts)
 | |
| 	t.Tracks[trackIndex].Dts += duration
 | |
| 	t.Tracks[trackIndex].Pts = t.Tracks[trackIndex].Dts + packet.GetPtsDtsDelta(track.payload.ClockRate)
 | |
| 
 | |
| 	if utils.AVMediaTypeAudio == packet.MediaType {
 | |
| 		result = t.PackRtpPayload(track, trackIndex, packet.Data, ts, false)
 | |
| 	} else if utils.AVMediaTypeVideo == packet.MediaType {
 | |
| 		annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
 | |
| 		result = t.PackRtpPayload(track, trackIndex, annexBData, ts, packet.Key)
 | |
| 	}
 | |
| 
 | |
| 	return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
 | |
| }
 | |
| 
 | |
| func (t *TransStream) ReadKeyFrameBuffer() ([]stream.TransStreamSegment, error) {
 | |
| 	// 默认不开启rtsp的关键帧缓存, 一次发送rtp包过多, 播放器的jitter buffer可能会溢出丢弃, 造成播放花屏
 | |
| 	//return t.segments, nil
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| // PackRtpPayload 打包返回rtp over tcp的数据包
 | |
| func (t *TransStream) PackRtpPayload(track *Track, trackIndex int, data []byte, timestamp uint32, videoKey bool) []*collections.ReferenceCounter[[]byte] {
 | |
| 	// 分割nalu
 | |
| 	var payloads [][]byte
 | |
| 	if utils.AVCodecIdH264 == track.CodecID || utils.AVCodecIdH265 == track.CodecID {
 | |
| 		avc.SplitNalU(data, func(nalu []byte) {
 | |
| 			payloads = append(payloads, avc.RemoveStartCode(nalu))
 | |
| 		})
 | |
| 	} else {
 | |
| 		payloads = append(payloads, data)
 | |
| 	}
 | |
| 
 | |
| 	var result []*collections.ReferenceCounter[[]byte]
 | |
| 	var packet []byte
 | |
| 	var counter *collections.ReferenceCounter[[]byte]
 | |
| 
 | |
| 	for _, payload := range payloads {
 | |
| 		// 保存开始序号
 | |
| 		track.StartSeq = track.Muxer.GetHeader().Seq
 | |
| 		track.Muxer.Input(payload, timestamp, func() []byte {
 | |
| 			counter = t.packetAllocator.Get()
 | |
| 			counter.Refer()
 | |
| 
 | |
| 			packet = counter.Get()
 | |
| 			// 预留rtp over tcp 4字节头部
 | |
| 			return packet[OverTcpHeaderSize:]
 | |
| 		}, func(bytes []byte) {
 | |
| 			track.EndSeq = track.Muxer.GetHeader().Seq
 | |
| 			// 每个包都存在rtp over tcp 4字节头部
 | |
| 			overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)]
 | |
| 			t.OverTCP(overTCPPacket, trackIndex)
 | |
| 
 | |
| 			counter.ResetData(overTCPPacket)
 | |
| 			result = append(result, counter)
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	// 引用计数保持为1
 | |
| 	for _, pkt := range result {
 | |
| 		pkt.Release()
 | |
| 	}
 | |
| 
 | |
| 	if t.HasVideo() && stream.AppConfig.GOPCache {
 | |
| 		// 遇到视频关键帧, 丢弃前一帧缓存
 | |
| 		if videoKey {
 | |
| 			for _, segment := range t.segments {
 | |
| 				for _, pkt := range segment.Data {
 | |
| 					pkt.Release()
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			t.segments = t.segments[:0]
 | |
| 		}
 | |
| 
 | |
| 		// 计数+1
 | |
| 		for _, pkt := range result {
 | |
| 			pkt.Refer()
 | |
| 		}
 | |
| 
 | |
| 		// 放在缓存末尾
 | |
| 		t.segments = append(t.segments, stream.TransStreamSegment{
 | |
| 			Data:  result,
 | |
| 			TS:    int64(timestamp),
 | |
| 			Key:   videoKey,
 | |
| 			Index: trackIndex,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
 | |
| 	// 恢复上次拉流的序号
 | |
| 	var startSeq uint16
 | |
| 	if t.lastEndSeq != nil {
 | |
| 		var ok bool
 | |
| 		startSeq, ok = t.lastEndSeq[track.Stream.CodecID]
 | |
| 		utils.Assert(ok)
 | |
| 	}
 | |
| 
 | |
| 	// 查找RTP封装器和PayloadType
 | |
| 	newMuxerFunc := rtp.SupportedCodecs[track.Stream.CodecID]
 | |
| 	utils.Assert(newMuxerFunc != nil)
 | |
| 
 | |
| 	// 创建RTP封装器
 | |
| 	muxer, payload := newMuxerFunc.(func(seq int, ssrc uint32) (rtp.Muxer, rtp.PayloadType))(int(startSeq), 0xFFFFFFFF)
 | |
| 
 | |
| 	// 创建track
 | |
| 	rtspTrack := NewRTSPTrack(muxer, payload, track.Stream.MediaType, track.Stream.CodecID)
 | |
| 	t.RtspTracks = append(t.RtspTracks, rtspTrack)
 | |
| 	trackIndex := len(t.RtspTracks) - 1
 | |
| 
 | |
| 	// 将sps和pps按照单一模式打包
 | |
| 	packAndAdd := func(data []byte) {
 | |
| 		packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0, true)
 | |
| 		utils.Assert(len(packets) == 1)
 | |
| 	}
 | |
| 
 | |
| 	if utils.AVMediaTypeVideo == track.Stream.MediaType {
 | |
| 		parameters := track.Stream.CodecParameters
 | |
| 		if utils.AVCodecIdH265 == track.Stream.CodecID {
 | |
| 			bytes := parameters.(*avformat.HEVCCodecData).VPS()
 | |
| 			packAndAdd(avc.RemoveStartCode(bytes[0]))
 | |
| 		}
 | |
| 
 | |
| 		spsBytes := parameters.SPS()
 | |
| 		ppsBytes := parameters.PPS()
 | |
| 		packAndAdd(avc.RemoveStartCode(spsBytes[0]))
 | |
| 		packAndAdd(avc.RemoveStartCode(ppsBytes[0]))
 | |
| 	}
 | |
| 
 | |
| 	return trackIndex, nil
 | |
| }
 | |
| 
 | |
| func (t *TransStream) Close() ([]stream.TransStreamSegment, error) {
 | |
| 	for _, track := range t.RtspTracks {
 | |
| 		if track != nil {
 | |
| 			track.Close()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func (t *TransStream) WriteHeader() error {
 | |
| 	description := sdp.SessionDescription{
 | |
| 		Version: 0,
 | |
| 		Origin: sdp.Origin{
 | |
| 			Username:       "-",
 | |
| 			SessionID:      0,
 | |
| 			SessionVersion: 0,
 | |
| 			NetworkType:    "IN",
 | |
| 			AddressType:    t.addrType,
 | |
| 			UnicastAddress: t.addr.IP.String(),
 | |
| 		},
 | |
| 
 | |
| 		SessionName: "Stream",
 | |
| 		TimeDescriptions: []sdp.TimeDescription{{
 | |
| 			Timing: sdp.Timing{
 | |
| 				StartTime: 0,
 | |
| 				StopTime:  0,
 | |
| 			},
 | |
| 			RepeatTimes: nil,
 | |
| 		},
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for i, track := range t.RtspTracks {
 | |
| 		mediaDescription := sdp.MediaDescription{
 | |
| 			ConnectionInformation: &sdp.ConnectionInformation{
 | |
| 				NetworkType: "IN",
 | |
| 				AddressType: t.addrType,
 | |
| 				Address:     &sdp.Address{Address: t.addr.IP.String()},
 | |
| 			},
 | |
| 
 | |
| 			Attributes: []sdp.Attribute{
 | |
| 				sdp.NewAttribute("recvonly", ""),
 | |
| 				sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
 | |
| 				sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", track.payload.Pt, track.payload.Encoding, track.payload.ClockRate), ""),
 | |
| 			},
 | |
| 		}
 | |
| 
 | |
| 		mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
 | |
| 		mediaDescription.MediaName.Formats = []string{strconv.Itoa(track.payload.Pt)}
 | |
| 
 | |
| 		if utils.AVMediaTypeAudio == track.MediaType {
 | |
| 			mediaDescription.MediaName.Media = "audio"
 | |
| 
 | |
| 			if utils.AVCodecIdAAC == track.CodecID {
 | |
| 				//[14496-3], [RFC6416] profile-level-id:
 | |
| 				//1 : Main Audio Profile Level 1
 | |
| 				//9 : Speech Audio Profile Level 1
 | |
| 				//15: High Quality Audio Profile Level 2
 | |
| 				//30: Natural Audio Profile Level 1
 | |
| 				//44: High Efficiency AAC Profile Level 2
 | |
| 				//48: High Efficiency AAC v2 Profile Level 2
 | |
| 				//55: Baseline MPEG Surround Profile (see ISO/IEC 23003-1) Level 3
 | |
| 
 | |
| 				//[RFC5619]
 | |
| 				//a=fmtp:96 streamType=5; profile-level-id=44; mode=AAC-hbr; config=131
 | |
| 				//     056E598; sizeLength=13; indexLength=3; indexDeltaLength=3; constant
 | |
| 				//     Duration=2048; MPS-profile-level-id=55; MPS-config=F1B4CF920442029B
 | |
| 				//     501185B6DA00;
 | |
| 				//低比特率用sizelength=6;indexlength=2;indexdeltalength=2
 | |
| 
 | |
| 				//[RFC3640]
 | |
| 				//mode=AAC-hbr
 | |
| 				fmtp := sdp.NewAttribute("fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;", "")
 | |
| 				mediaDescription.Attributes = append(mediaDescription.Attributes, fmtp)
 | |
| 			}
 | |
| 
 | |
| 		} else if utils.AVMediaTypeVideo == track.MediaType {
 | |
| 			mediaDescription.MediaName.Media = "video"
 | |
| 		}
 | |
| 
 | |
| 		description.MediaDescriptions = append(description.MediaDescriptions, &mediaDescription)
 | |
| 	}
 | |
| 
 | |
| 	marshal, err := description.Marshal()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	t.sdp = string(marshal)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[utils.AVCodecID]uint16) stream.TransStream {
 | |
| 	t := &TransStream{
 | |
| 		addr:            addr,
 | |
| 		urlFormat:       urlFormat,
 | |
| 		lastEndSeq:      oldTracks,
 | |
| 		packetAllocator: stream.NewRtpBuffer(512),
 | |
| 	}
 | |
| 
 | |
| 	if addr.IP.To4() != nil {
 | |
| 		t.addrType = "IP4"
 | |
| 	} else {
 | |
| 		t.addrType = "IP6"
 | |
| 	}
 | |
| 
 | |
| 	return t
 | |
| }
 | |
| 
 | |
| func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
 | |
| 	trackFormat := "?track=%d"
 | |
| 	var oldTracks map[utils.AVCodecID]uint16
 | |
| 	if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
 | |
| 		oldTracks = endInfo.RtspTracks
 | |
| 		if oldTracks != nil {
 | |
| 			for codecID, seq := range oldTracks {
 | |
| 				log.Sugar.Infof("track codecID: %s, seq: %d", codecID, seq)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return NewTransStream(net.IPAddr{
 | |
| 		IP:   net.ParseIP(stream.AppConfig.PublicIP),
 | |
| 		Zone: "",
 | |
| 	}, trackFormat, oldTracks), nil
 | |
| }
 | 
