refactor: 重构传输流的track管理

This commit is contained in:
ydajiang
2025-06-11 16:40:58 +08:00
parent 668ad3aca5
commit 93ea8f2fc2
18 changed files with 189 additions and 165 deletions

View File

@@ -37,9 +37,9 @@ func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.
}
} else if stream.TransStreamRtsp == transStream.GetProtocol() {
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
info.RtspTracks = make(map[byte]uint16, len(tracks))
info.RtspTracks = make(map[int]uint16, len(tracks))
for _, track := range rtsp.RtspTracks {
info.RtspTracks[track.PT] = track.EndSeq
info.RtspTracks[int(track.CodecID)] = track.EndSeq
}
}
} else if stream.TransStreamFlv == transStream.GetProtocol() {

View File

@@ -18,7 +18,7 @@ type TransStream struct {
flvExtraDataBlock []byte // metadata和sequence header
}
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer()
var flvTagSize int
@@ -94,20 +94,19 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
}
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
var index int
var err error
if utils.AVMediaTypeAudio == track.Stream.MediaType {
t.Muxer.AddAudioTrack(track.Stream)
index, err = t.Muxer.AddAudioTrack(track.Stream)
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.Muxer.AddVideoTrack(track.Stream)
index, err = t.Muxer.AddVideoTrack(track.Stream)
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
}
return nil
return index, err
}
func (t *TransStream) WriteHeader() error {

View File

@@ -2,11 +2,9 @@ package gb28181
import (
"encoding/binary"
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"github.com/lkmio/rtp"
@@ -17,49 +15,25 @@ type GBGateway struct {
ps *mpeg.PSMuxer
rtp rtp.Muxer
psBuffer []byte
tracks map[utils.AVCodecID]int // codec->track index
rtpBuffer *stream.RtpBuffer
}
func (s *GBGateway) WriteHeader() error {
if len(s.tracks) == 0 {
return fmt.Errorf("no tracks available")
}
return nil
}
func (s *GBGateway) AddTrack(track *stream.Track) error {
s.BaseTransStream.AddTrack(track)
if utils.AVCodecIdH264 == track.Stream.CodecID || utils.AVCodecIdH265 == track.Stream.CodecID || utils.AVCodecIdAAC == track.Stream.CodecID || utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
} else {
log.Sugar.Errorf("不支持的编码格式: %d", track.Stream.CodecID)
return nil
}
func (s *GBGateway) AddTrack(track *stream.Track) (int, error) {
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
if err != nil {
log.Sugar.Error("添加%s到ps muxer失败", track.Stream.CodecID)
return nil
return -1, err
}
s.tracks[track.Stream.CodecID] = index
return nil
return index, nil
}
func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
trackIndex, ok := s.tracks[packet.CodecID]
if !ok {
log.Sugar.Errorf("未找到对应的track: %d", packet.CodecID)
return nil, 0, false, nil
}
func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
dts := packet.ConvertDts(90000)
pts := packet.ConvertPts(90000)
data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(s.BaseTransStream.Tracks[packet.Index].Stream, packet)
data = avformat.AVCCPacket2AnnexB(s.FindTrackWithStreamIndex(packet.Index).Stream, packet)
}
// 扩容ps buffer
@@ -67,7 +41,7 @@ func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCo
s.psBuffer = make([]byte, len(data)*2)
}
n := s.ps.Input(s.psBuffer, trackIndex, packet.Key, data, &pts, &dts)
n := s.ps.Input(s.psBuffer, index, packet.Key, data, &pts, &dts)
var result []*collections.ReferenceCounter[[]byte]
var rtpBuffer []byte
@@ -101,7 +75,6 @@ func NewGBGateway(ssrc uint32) *GBGateway {
ps: mpeg.NewPsMuxer(),
rtp: rtp.NewMuxer(96, 0, ssrc),
psBuffer: make([]byte, 1024*1024*2),
tracks: make(map[utils.AVCodecID]int),
rtpBuffer: stream.NewRtpBuffer(1024),
}
}

View File

@@ -17,7 +17,7 @@ func (s *TalkStream) WriteHeader() error {
return nil
}
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (s *TalkStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
var size int
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
return s.packet
@@ -26,7 +26,7 @@ func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceC
})
packet = &avformat.AVPacket{Data: s.packet[:size]}
return s.RtpStream.Input(packet)
return s.RtpStream.Input(packet, index)
}
func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) {

View File

@@ -39,7 +39,7 @@ type TransStream struct {
PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
}
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
// 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
var newSegment bool
@@ -64,7 +64,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
dts := packet.ConvertDts(90000)
data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
}
// 写入ts切片
@@ -77,7 +77,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
}
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
i += t.muxer.Input(bytes, packet.Index, data[i:], length, dts, pts, packet.Key, i == 0)
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
t.ctx.writeBufferSize += mpeg.TsPacketSize
}
@@ -89,25 +89,20 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
return nil, -1, true, nil
}
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
var err error
var trackIndex int
if utils.AVMediaTypeVideo == track.Stream.MediaType {
data := track.Stream.CodecParameters.AnnexBExtraData()
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
} else {
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
}
return err
return trackIndex, err
}
func (t *TransStream) WriteHeader() error {
//if packet.Index >= t.muxer.TrackCount() {
// return nil, -1, false, fmt.Errorf("track not available")
//}
return t.createSegment()
}

View File

@@ -67,7 +67,7 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) {
h.fos.Write(h.buffer[:n])
}
packets, _, _, err := h.gateway.Input(packet)
packets, _, _, err := h.gateway.Input(packet, i)
if err != nil {
panic(err)
}

19
main.go
View File

@@ -2,11 +2,14 @@ package main
import (
"encoding/json"
flv2 "github.com/lkmio/flv"
"github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/jt1078"
"github.com/lkmio/lkm/record"
"github.com/lkmio/lkm/rtsp"
"github.com/lkmio/mpeg"
"github.com/lkmio/rtp"
"github.com/lkmio/transport"
"os"
"time"
@@ -24,14 +27,14 @@ import (
)
func init() {
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory)
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory, mpeg.SupportedCodecs)
stream.SetRecordStreamFactory(record.NewFLVFileSink)
stream.StreamEndInfoBride = NewStreamEndInfo

View File

@@ -41,27 +41,11 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
tracks := transStream.GetTracks()
for index, track := range tracks {
var mimeType string
var id string
codecId := track.Stream.CodecID
if utils.AVCodecIdH264 == codecId {
mimeType = webrtc.MimeTypeH264
} else if utils.AVCodecIdH265 == codecId {
mimeType = webrtc.MimeTypeH265
} else if utils.AVCodecIdAV1 == codecId {
mimeType = webrtc.MimeTypeAV1
} else if utils.AVCodecIdVP8 == codecId {
mimeType = webrtc.MimeTypeVP8
} else if utils.AVCodecIdVP9 == codecId {
mimeType = webrtc.MimeTypeVP9
} else if utils.AVCodecIdOPUS == codecId {
mimeType = webrtc.MimeTypeOpus
} else if utils.AVCodecIdPCMALAW == codecId {
mimeType = webrtc.MimeTypePCMA
} else if utils.AVCodecIdPCMMULAW == codecId {
mimeType = webrtc.MimeTypePCMU
} else {
log.Sugar.Errorf("codec %s not compatible with webrtc", codecId)
mimeType, ok := SupportedCodecs[codecId]
if !ok {
log.Sugar.Errorf("unsupported codec: %s", codecId)
continue
}
@@ -71,7 +55,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
id = "video"
}
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType.(string)}, id, "pion")
if err != nil {
return err
}

View File

@@ -12,19 +12,31 @@ import (
var (
webrtcApi *webrtc.API
SupportedCodecs = map[utils.AVCodecID]interface{}{
utils.AVCodecIdH264: webrtc.MimeTypeH264,
utils.AVCodecIdH265: webrtc.MimeTypeH265,
utils.AVCodecIdAV1: webrtc.MimeTypeAV1,
utils.AVCodecIdVP8: webrtc.MimeTypeVP8,
utils.AVCodecIdVP9: webrtc.MimeTypeVP9,
utils.AVCodecIdOPUS: webrtc.MimeTypeOpus,
utils.AVCodecIdPCMALAW: webrtc.MimeTypePCMA,
utils.AVCodecIdPCMMULAW: webrtc.MimeTypePCMU,
utils.AVCodecIdADPCMG722: webrtc.MimeTypeG722,
}
)
type transStream struct {
stream.BaseTransStream
}
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer()
if utils.AVMediaTypeAudio == packet.MediaType {
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data))
} else if utils.AVMediaTypeVideo == packet.MediaType {
avStream := t.BaseTransStream.Tracks[packet.Index].Stream
avStream := t.FindTrackWithStreamIndex(packet.Index).Stream
if packet.Key {
extra := avStream.CodecParameters.AnnexBExtraData()
t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra))

View File

@@ -21,7 +21,7 @@ type transStream struct {
metaData *amf0.Object // 推流方携带的元数据
}
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer()
var data []byte

View File

@@ -29,7 +29,7 @@ type TransStream struct {
RtspTracks []*Track
//oldTracks []*Track
oldTracks map[byte]uint16
oldTracks map[int]uint16
sdp string
rtpBuffer *stream.RtpBuffer
@@ -41,18 +41,18 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
var ts uint32
var result []*collections.ReferenceCounter[[]byte]
track := t.RtspTracks[packet.Index]
track := t.RtspTracks[trackIndex]
if utils.AVMediaTypeAudio == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
result = t.PackRtpPayload(track, packet.Index, packet.Data, ts)
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
result = t.PackRtpPayload(track, trackIndex, packet.Data, ts)
} else if utils.AVMediaTypeVideo == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
ts = uint32(packet.ConvertPts(track.payload.ClockRate))
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
data := avc.RemoveStartCode(annexBData)
result = t.PackRtpPayload(track, packet.Index, data, ts)
result = t.PackRtpPayload(track, trackIndex, data, ts)
}
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
@@ -110,36 +110,24 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim
return result
}
func (t *TransStream) AddTrack(track *stream.Track) error {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
payloadType, ok := rtp.CodecIdPayloads[track.Stream.CodecID]
if !ok {
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecID)
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
// 恢复上次拉流的序号
var startSeq uint16
if t.oldTracks != nil {
startSeq, ok = t.oldTracks[byte(payloadType.Pt)]
var ok bool
startSeq, ok = t.oldTracks[int(track.Stream.CodecID)]
utils.Assert(ok)
}
// 创建RTP封装器
var muxer rtp.Muxer
if utils.AVCodecIdH264 == track.Stream.CodecID {
muxer = rtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == track.Stream.CodecID {
muxer = rtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == track.Stream.CodecID {
muxer = rtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
muxer = rtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
}
// 查找RTP封装器和PayloadType
newMuxerFunc := rtp.SupportedCodecs[track.Stream.CodecID]
utils.Assert(newMuxerFunc != nil)
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType)
// 创建RTP封装器
muxer, payload := newMuxerFunc.(func(seq int, ssrc uint32) (rtp.Muxer, rtp.PayloadType))(int(startSeq), 0xFFFFFFFF)
// 创建track
rtspTrack := NewRTSPTrack(muxer, payload, track.Stream.MediaType, track.Stream.CodecID)
t.RtspTracks = append(t.RtspTracks, rtspTrack)
trackIndex := len(t.RtspTracks) - 1
@@ -170,7 +158,7 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
}
return nil
return trackIndex, nil
}
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
@@ -206,8 +194,7 @@ func (t *TransStream) WriteHeader() error {
},
}
for i, track := range t.Tracks {
payloadType, _ := rtp.CodecIdPayloads[track.Stream.CodecID]
for i, track := range t.RtspTracks {
mediaDescription := sdp.MediaDescription{
ConnectionInformation: &sdp.ConnectionInformation{
NetworkType: "IN",
@@ -218,17 +205,17 @@ func (t *TransStream) WriteHeader() error {
Attributes: []sdp.Attribute{
sdp.NewAttribute("recvonly", ""),
sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", payloadType.Pt, payloadType.Encoding, payloadType.ClockRate), ""),
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", track.payload.Pt, track.payload.Encoding, track.payload.ClockRate), ""),
},
}
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)}
mediaDescription.MediaName.Formats = []string{strconv.Itoa(track.payload.Pt)}
if utils.AVMediaTypeAudio == track.Stream.MediaType {
if utils.AVMediaTypeAudio == track.MediaType {
mediaDescription.MediaName.Media = "audio"
if utils.AVCodecIdAAC == track.Stream.CodecID {
if utils.AVCodecIdAAC == track.CodecID {
//[14496-3], [RFC6416] profile-level-id:
//1 : Main Audio Profile Level 1
//9 : Speech Audio Profile Level 1
@@ -267,7 +254,7 @@ func (t *TransStream) WriteHeader() error {
return nil
}
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream {
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) stream.TransStream {
t := &TransStream{
addr: addr,
urlFormat: urlFormat,
@@ -286,7 +273,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
trackFormat := "?track=%d"
var oldTracks map[byte]uint16
var oldTracks map[int]uint16
if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
oldTracks = endInfo.RtspTracks
}

View File

@@ -8,11 +8,11 @@ import (
// Track rtsp每路输出流的封装
type Track struct {
PT byte
Rate int
payload rtp.PayloadType
MediaType utils.AVMediaType
StartSeq uint16
EndSeq uint16
CodecID utils.AVCodecID
Muxer rtp.Muxer
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
@@ -21,12 +21,12 @@ type Track struct {
func (r *Track) Close() {
}
func NewRTSPTrack(muxer rtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track {
stream := &Track{
PT: pt,
Rate: rate,
payload: payload,
Muxer: muxer,
MediaType: mediaType,
CodecID: id,
}
return stream

View File

@@ -12,11 +12,7 @@ type RtpStream struct {
rtpBuffer *RtpBuffer
}
func (f *RtpStream) WriteHeader() error {
return nil
}
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (f *RtpStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
size := 2 + uint16(len(packet.Data))
if size > UDPReceiveBufferSize {
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)

View File

@@ -21,7 +21,7 @@ type StreamEndInfo struct {
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
M3U8Writer M3U8Writer // 保存M3U8生成器
PlaylistFormat *string // M3U8播放列表
RtspTracks map[byte]uint16 // rtsp每路track的结束序号
RtspTracks map[int]uint16 // rtsp每路track的结束序号
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
}

View File

@@ -2,6 +2,7 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/utils"
)
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error)
@@ -17,12 +18,15 @@ func init() {
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
}
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) {
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory, supportedCodes map[utils.AVCodecID]interface{}) {
_, ok := transStreamFactories[protocol]
if ok {
panic(fmt.Sprintf("%s has been registered", protocol.String()))
} else if _, ok = SupportedCodes[protocol]; ok {
panic(fmt.Sprintf("%s has been registered", protocol.String()))
}
SupportedCodes[protocol] = supportedCodes
transStreamFactories[protocol] = streamFunc
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
@@ -190,7 +191,8 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
id := GenerateTransStreamID(TransStreamHls, streams...)
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
if err != nil {
panic(err)
log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error())
return
}
t.DispatchGOPBuffer(hlsStream)
@@ -214,16 +216,34 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
utils.Assert(source != nil)
transStream, err := CreateTransStream(source, protocol, tracks, sink)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), t.source)
return nil, err
}
for _, track := range tracks {
supportedCodecs, ok := SupportedCodes[protocol]
if !ok {
panic(fmt.Sprintf("unknown protocol %s", protocol.String()))
}
_, ok = supportedCodecs[track.Stream.CodecID]
if !ok {
log.Sugar.Warnf("不支持的编码器 %s %s", protocol.String(), track.Stream.CodecID)
}
var index int
// 重新拷贝一个track传输流内部使用track的时间戳
newTrack := *track
if err = transStream.AddTrack(&newTrack); err != nil {
return nil, err
if index, err = transStream.AddTrack(&newTrack); err != nil {
log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID)
continue
}
// stream index->muxer track index
transStream.SetMuxerTrack(index, &newTrack)
}
if transStream.TrackSize() == 0 {
return nil, fmt.Errorf("not found track")
}
transStream.SetID(id)
@@ -251,12 +271,17 @@ func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
// DispatchPacket 分发AVPacket
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
data, timestamp, videoKey, err := transStream.Input(packet)
trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index)
if !ok {
return
}
data, timestamp, videoKey, err := transStream.Input(packet, trackIndex)
if err != nil || len(data) < 1 {
return
}
t.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey)
t.DispatchBuffer(transStream, trackIndex, data, timestamp, videoKey)
}
// DispatchBuffer 分发传输流
@@ -370,14 +395,14 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
{
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.GetState() {
sink.UnLock()
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
return false
} else {
sink.SetState(SessionStateTransferring)
}
sink.UnLock()
}
err := sink.StartStreaming(transStream)
@@ -408,13 +433,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
// 发送已有的缓存数据
// 此处发送缓存数据必须要存在关键帧的输出流才发否则等DispatchPacket时再发送extra。
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(data) > 0 {
keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(keyBuffer) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
t.write(sink, 0, extraData, timestamp, false)
}
t.write(sink, 0, data, timestamp, true)
t.write(sink, 0, keyBuffer, timestamp, true)
}
// 新建传输流,发送已经缓存的音视频帧

View File

@@ -6,6 +6,10 @@ import (
"github.com/lkmio/avformat/utils"
)
var (
SupportedCodes = map[TransStreamProtocol]map[utils.AVCodecID]interface{}{}
)
// TransStream 将AVPacket封装成传输流
type TransStream interface {
GetID() TransStreamID
@@ -13,14 +17,20 @@ type TransStream interface {
SetID(id TransStreamID)
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
AddTrack(track *Track) error
AddTrack(track *Track) (int, error)
SetMuxerTrack(muxerIndex int, track *Track)
FindMuxerTrackIndex(streamIndex int) (int, bool)
TrackSize() int
GetTracks() []*Track
FindTrackWithStreamIndex(streamIndex int) *Track
// WriteHeader track添加完毕, 通过调用此函数告知
WriteHeader() error
@@ -48,6 +58,7 @@ type TransStream interface {
type BaseTransStream struct {
ID TransStreamID
Tracks []*Track
MuxerIndex map[int]int // stream index->muxer track index
Completed bool
ExistVideo bool
Protocol TransStreamProtocol
@@ -64,15 +75,47 @@ func (t *BaseTransStream) SetID(id TransStreamID) {
t.ID = id
}
func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
func (t *BaseTransStream) WriteHeader() error {
return nil
}
func (t *BaseTransStream) Input(trackIndex, packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
return nil, -1, false, nil
}
func (t *BaseTransStream) AddTrack(track *Track) error {
func (t *BaseTransStream) AddTrack(track *Track) (int, error) {
return len(t.Tracks), nil
}
func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) {
t.Tracks = append(t.Tracks, track)
if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.ExistVideo = true
}
if t.MuxerIndex == nil {
t.MuxerIndex = make(map[int]int)
}
// 如果muxerIndex为-1, 意味着复用器封装流时并不需要指定track index, 比如flv.
if muxerIndex > -1 {
t.MuxerIndex[track.Stream.Index] = muxerIndex
} else {
t.MuxerIndex[track.Stream.Index] = len(t.Tracks) - 1
}
}
func (t *BaseTransStream) FindMuxerTrackIndex(streamIndex int) (int, bool) {
index, ok := t.MuxerIndex[streamIndex]
return index, ok
}
func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track {
for _, track := range t.Tracks {
if track.Stream.Index == streamIndex {
return track
}
}
return nil
}

View File

@@ -21,11 +21,12 @@ func init() {
int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105,
int(utils.AVCodecIdADPCMG722): 106,
}
}
@@ -58,7 +59,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
for i, track := range tracks {
id, ok := narrowCodecIds[int(track.Stream.CodecID)]
utils.Assert(ok)
if ok {
id = byte(track.Stream.CodecID)
}
streamId |= uint64(id) << (48 - i*8)
}