Compare commits

...

3 Commits

Author SHA1 Message Date
ydajiang
7155b866c2 fix: 不支持的编码器仍然继续封装流的问题 2025-06-12 10:30:12 +08:00
ydajiang
1e982fce5f Merge branch 'dev' of https://gitee.com/lkmio/lkm into dev 2025-06-11 16:41:05 +08:00
ydajiang
93ea8f2fc2 refactor: 重构传输流的track管理 2025-06-11 16:40:58 +08:00
18 changed files with 192 additions and 175 deletions

View File

@@ -37,9 +37,9 @@ func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.
} }
} else if stream.TransStreamRtsp == transStream.GetProtocol() { } else if stream.TransStreamRtsp == transStream.GetProtocol() {
if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 { if rtsp := transStream.(*rtsp.TransStream); len(rtsp.Tracks) > 0 {
info.RtspTracks = make(map[byte]uint16, len(tracks)) info.RtspTracks = make(map[int]uint16, len(tracks))
for _, track := range rtsp.RtspTracks { for _, track := range rtsp.RtspTracks {
info.RtspTracks[track.PT] = track.EndSeq info.RtspTracks[int(track.CodecID)] = track.EndSeq
} }
} }
} else if stream.TransStreamFlv == transStream.GetProtocol() { } else if stream.TransStreamFlv == transStream.GetProtocol() {

View File

@@ -18,7 +18,7 @@ type TransStream struct {
flvExtraDataBlock []byte // metadata和sequence header flvExtraDataBlock []byte // metadata和sequence header
} }
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *TransStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer() t.ClearOutStreamBuffer()
var flvTagSize int var flvTagSize int
@@ -94,20 +94,19 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
} }
func (t *TransStream) AddTrack(track *stream.Track) error { func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
if err := t.BaseTransStream.AddTrack(track); err != nil { var index int
return err var err error
}
if utils.AVMediaTypeAudio == track.Stream.MediaType { if utils.AVMediaTypeAudio == track.Stream.MediaType {
t.Muxer.AddAudioTrack(track.Stream) index, err = t.Muxer.AddAudioTrack(track.Stream)
} else if utils.AVMediaTypeVideo == track.Stream.MediaType { } else if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.Muxer.AddVideoTrack(track.Stream) index, err = t.Muxer.AddVideoTrack(track.Stream)
t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width())) t.Muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters.Width()))
t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height())) t.Muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters.Height()))
} }
return nil
return index, err
} }
func (t *TransStream) WriteHeader() error { func (t *TransStream) WriteHeader() error {

View File

@@ -2,11 +2,9 @@ package gb28181
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"github.com/lkmio/avformat" "github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils" "github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream" "github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg" "github.com/lkmio/mpeg"
"github.com/lkmio/rtp" "github.com/lkmio/rtp"
@@ -17,49 +15,25 @@ type GBGateway struct {
ps *mpeg.PSMuxer ps *mpeg.PSMuxer
rtp rtp.Muxer rtp rtp.Muxer
psBuffer []byte psBuffer []byte
tracks map[utils.AVCodecID]int // codec->track index
rtpBuffer *stream.RtpBuffer rtpBuffer *stream.RtpBuffer
} }
func (s *GBGateway) WriteHeader() error { func (s *GBGateway) AddTrack(track *stream.Track) (int, error) {
if len(s.tracks) == 0 {
return fmt.Errorf("no tracks available")
}
return nil
}
func (s *GBGateway) AddTrack(track *stream.Track) error {
s.BaseTransStream.AddTrack(track)
if utils.AVCodecIdH264 == track.Stream.CodecID || utils.AVCodecIdH265 == track.Stream.CodecID || utils.AVCodecIdAAC == track.Stream.CodecID || utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
} else {
log.Sugar.Errorf("不支持的编码格式: %d", track.Stream.CodecID)
return nil
}
index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID) index, err := s.ps.AddTrack(track.Stream.MediaType, track.Stream.CodecID)
if err != nil { if err != nil {
log.Sugar.Error("添加%s到ps muxer失败", track.Stream.CodecID) return -1, err
return nil
} }
s.tracks[track.Stream.CodecID] = index return index, nil
return nil
} }
func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (s *GBGateway) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
trackIndex, ok := s.tracks[packet.CodecID]
if !ok {
log.Sugar.Errorf("未找到对应的track: %d", packet.CodecID)
return nil, 0, false, nil
}
dts := packet.ConvertDts(90000) dts := packet.ConvertDts(90000)
pts := packet.ConvertPts(90000) pts := packet.ConvertPts(90000)
data := packet.Data data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType { if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(s.BaseTransStream.Tracks[packet.Index].Stream, packet) data = avformat.AVCCPacket2AnnexB(s.FindTrackWithStreamIndex(packet.Index).Stream, packet)
} }
// 扩容ps buffer // 扩容ps buffer
@@ -67,7 +41,7 @@ func (s *GBGateway) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCo
s.psBuffer = make([]byte, len(data)*2) s.psBuffer = make([]byte, len(data)*2)
} }
n := s.ps.Input(s.psBuffer, trackIndex, packet.Key, data, &pts, &dts) n := s.ps.Input(s.psBuffer, index, packet.Key, data, &pts, &dts)
var result []*collections.ReferenceCounter[[]byte] var result []*collections.ReferenceCounter[[]byte]
var rtpBuffer []byte var rtpBuffer []byte
@@ -101,7 +75,6 @@ func NewGBGateway(ssrc uint32) *GBGateway {
ps: mpeg.NewPsMuxer(), ps: mpeg.NewPsMuxer(),
rtp: rtp.NewMuxer(96, 0, ssrc), rtp: rtp.NewMuxer(96, 0, ssrc),
psBuffer: make([]byte, 1024*1024*2), psBuffer: make([]byte, 1024*1024*2),
tracks: make(map[utils.AVCodecID]int),
rtpBuffer: stream.NewRtpBuffer(1024), rtpBuffer: stream.NewRtpBuffer(1024),
} }
} }

View File

@@ -17,7 +17,7 @@ func (s *TalkStream) WriteHeader() error {
return nil return nil
} }
func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (s *TalkStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
var size int var size int
s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte { s.muxer.Input(packet.Data, uint32(packet.Dts), func() []byte {
return s.packet return s.packet
@@ -26,7 +26,7 @@ func (s *TalkStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceC
}) })
packet = &avformat.AVPacket{Data: s.packet[:size]} packet = &avformat.AVPacket{Data: s.packet[:size]}
return s.RtpStream.Input(packet) return s.RtpStream.Input(packet, index)
} }
func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) { func NewTalkTransStream(ssrc uint32) (stream.TransStream, error) {

View File

@@ -39,7 +39,7 @@ type TransStream struct {
PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink PlaylistFormatPtrCounter []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
} }
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
// 创建一下个切片 // 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片 // 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
var newSegment bool var newSegment bool
@@ -64,7 +64,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
dts := packet.ConvertDts(90000) dts := packet.ConvertDts(90000)
data := packet.Data data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType { if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet) data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
} }
// 写入ts切片 // 写入ts切片
@@ -77,7 +77,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
} }
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize] bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
i += t.muxer.Input(bytes, packet.Index, data[i:], length, dts, pts, packet.Key, i == 0) i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
t.ctx.writeBufferSize += mpeg.TsPacketSize t.ctx.writeBufferSize += mpeg.TsPacketSize
} }
@@ -89,25 +89,20 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.Reference
return nil, -1, true, nil return nil, -1, true, nil
} }
func (t *TransStream) AddTrack(track *stream.Track) error { func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
var err error var err error
var trackIndex int
if utils.AVMediaTypeVideo == track.Stream.MediaType { if utils.AVMediaTypeVideo == track.Stream.MediaType {
data := track.Stream.CodecParameters.AnnexBExtraData() data := track.Stream.CodecParameters.AnnexBExtraData()
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data) trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
} else { } else {
_, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data) trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
} }
return err
return trackIndex, err
} }
func (t *TransStream) WriteHeader() error { func (t *TransStream) WriteHeader() error {
//if packet.Index >= t.muxer.TrackCount() {
// return nil, -1, false, fmt.Errorf("track not available")
//}
return t.createSegment() return t.createSegment()
} }

View File

@@ -67,7 +67,7 @@ func (h Handler) OnPacket(packet *avformat.AVPacket) {
h.fos.Write(h.buffer[:n]) h.fos.Write(h.buffer[:n])
} }
packets, _, _, err := h.gateway.Input(packet) packets, _, _, err := h.gateway.Input(packet, i)
if err != nil { if err != nil {
panic(err) panic(err)
} }

19
main.go
View File

@@ -2,11 +2,14 @@ package main
import ( import (
"encoding/json" "encoding/json"
flv2 "github.com/lkmio/flv"
"github.com/lkmio/lkm/flv" "github.com/lkmio/lkm/flv"
"github.com/lkmio/lkm/hls" "github.com/lkmio/lkm/hls"
"github.com/lkmio/lkm/jt1078" "github.com/lkmio/lkm/jt1078"
"github.com/lkmio/lkm/record" "github.com/lkmio/lkm/record"
"github.com/lkmio/lkm/rtsp" "github.com/lkmio/lkm/rtsp"
"github.com/lkmio/mpeg"
"github.com/lkmio/rtp"
"github.com/lkmio/transport" "github.com/lkmio/transport"
"os" "os"
"time" "time"
@@ -24,14 +27,14 @@ import (
) )
func init() { func init() {
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamGBCascaded, stream.GBCascadedTransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamGBTalk, gb28181.TalkTransStreamFactory, mpeg.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory) stream.RegisterTransStreamFactory(stream.TransStreamGBGateway, gb28181.GatewayTransStreamFactory, mpeg.SupportedCodecs)
stream.SetRecordStreamFactory(record.NewFLVFileSink) stream.SetRecordStreamFactory(record.NewFLVFileSink)
stream.StreamEndInfoBride = NewStreamEndInfo stream.StreamEndInfoBride = NewStreamEndInfo

View File

@@ -41,27 +41,11 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
tracks := transStream.GetTracks() tracks := transStream.GetTracks()
for index, track := range tracks { for index, track := range tracks {
var mimeType string
var id string var id string
codecId := track.Stream.CodecID codecId := track.Stream.CodecID
if utils.AVCodecIdH264 == codecId { mimeType, ok := SupportedCodecs[codecId]
mimeType = webrtc.MimeTypeH264 if !ok {
} else if utils.AVCodecIdH265 == codecId { log.Sugar.Errorf("unsupported codec: %s", codecId)
mimeType = webrtc.MimeTypeH265
} else if utils.AVCodecIdAV1 == codecId {
mimeType = webrtc.MimeTypeAV1
} else if utils.AVCodecIdVP8 == codecId {
mimeType = webrtc.MimeTypeVP8
} else if utils.AVCodecIdVP9 == codecId {
mimeType = webrtc.MimeTypeVP9
} else if utils.AVCodecIdOPUS == codecId {
mimeType = webrtc.MimeTypeOpus
} else if utils.AVCodecIdPCMALAW == codecId {
mimeType = webrtc.MimeTypePCMA
} else if utils.AVCodecIdPCMMULAW == codecId {
mimeType = webrtc.MimeTypePCMU
} else {
log.Sugar.Errorf("codec %s not compatible with webrtc", codecId)
continue continue
} }
@@ -71,7 +55,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
id = "video" id = "video"
} }
remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion") remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType.(string)}, id, "pion")
if err != nil { if err != nil {
return err return err
} }

View File

@@ -12,19 +12,31 @@ import (
var ( var (
webrtcApi *webrtc.API webrtcApi *webrtc.API
SupportedCodecs = map[utils.AVCodecID]interface{}{
utils.AVCodecIdH264: webrtc.MimeTypeH264,
utils.AVCodecIdH265: webrtc.MimeTypeH265,
utils.AVCodecIdAV1: webrtc.MimeTypeAV1,
utils.AVCodecIdVP8: webrtc.MimeTypeVP8,
utils.AVCodecIdVP9: webrtc.MimeTypeVP9,
utils.AVCodecIdOPUS: webrtc.MimeTypeOpus,
utils.AVCodecIdPCMALAW: webrtc.MimeTypePCMA,
utils.AVCodecIdPCMMULAW: webrtc.MimeTypePCMU,
utils.AVCodecIdADPCMG722: webrtc.MimeTypeG722,
}
) )
type transStream struct { type transStream struct {
stream.BaseTransStream stream.BaseTransStream
} }
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer() t.ClearOutStreamBuffer()
if utils.AVMediaTypeAudio == packet.MediaType { if utils.AVMediaTypeAudio == packet.MediaType {
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data)) t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet.Data))
} else if utils.AVMediaTypeVideo == packet.MediaType { } else if utils.AVMediaTypeVideo == packet.MediaType {
avStream := t.BaseTransStream.Tracks[packet.Index].Stream avStream := t.FindTrackWithStreamIndex(packet.Index).Stream
if packet.Key { if packet.Key {
extra := avStream.CodecParameters.AnnexBExtraData() extra := avStream.CodecParameters.AnnexBExtraData()
t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra)) t.AppendOutStreamBuffer(collections.NewReferenceCounter(extra))

View File

@@ -21,7 +21,7 @@ type transStream struct {
metaData *amf0.Object // 推流方携带的元数据 metaData *amf0.Object // 推流方携带的元数据
} }
func (t *transStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *transStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer() t.ClearOutStreamBuffer()
var data []byte var data []byte

View File

@@ -29,7 +29,7 @@ type TransStream struct {
RtspTracks []*Track RtspTracks []*Track
//oldTracks []*Track //oldTracks []*Track
oldTracks map[byte]uint16 oldTracks map[int]uint16
sdp string sdp string
rtpBuffer *stream.RtpBuffer rtpBuffer *stream.RtpBuffer
@@ -41,18 +41,18 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4)) binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
} }
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *TransStream) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
var ts uint32 var ts uint32
var result []*collections.ReferenceCounter[[]byte] var result []*collections.ReferenceCounter[[]byte]
track := t.RtspTracks[packet.Index] track := t.RtspTracks[trackIndex]
if utils.AVMediaTypeAudio == packet.MediaType { if utils.AVMediaTypeAudio == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate)) ts = uint32(packet.ConvertPts(track.payload.ClockRate))
result = t.PackRtpPayload(track, packet.Index, packet.Data, ts) result = t.PackRtpPayload(track, trackIndex, packet.Data, ts)
} else if utils.AVMediaTypeVideo == packet.MediaType { } else if utils.AVMediaTypeVideo == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate)) ts = uint32(packet.ConvertPts(track.payload.ClockRate))
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet) annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[trackIndex].Stream, packet)
data := avc.RemoveStartCode(annexBData) data := avc.RemoveStartCode(annexBData)
result = t.PackRtpPayload(track, packet.Index, data, ts) result = t.PackRtpPayload(track, trackIndex, data, ts)
} }
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
@@ -110,36 +110,24 @@ func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, tim
return result return result
} }
func (t *TransStream) AddTrack(track *stream.Track) error { func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
if err := t.BaseTransStream.AddTrack(track); err != nil {
return err
}
payloadType, ok := rtp.CodecIdPayloads[track.Stream.CodecID]
if !ok {
return fmt.Errorf("no payload type was found for codecid: %d", track.Stream.CodecID)
}
// 恢复上次拉流的序号 // 恢复上次拉流的序号
var startSeq uint16 var startSeq uint16
if t.oldTracks != nil { if t.oldTracks != nil {
startSeq, ok = t.oldTracks[byte(payloadType.Pt)] var ok bool
startSeq, ok = t.oldTracks[int(track.Stream.CodecID)]
utils.Assert(ok) utils.Assert(ok)
} }
// 创建RTP封装器 // 查找RTP封装器和PayloadType
var muxer rtp.Muxer newMuxerFunc := rtp.SupportedCodecs[track.Stream.CodecID]
if utils.AVCodecIdH264 == track.Stream.CodecID { utils.Assert(newMuxerFunc != nil)
muxer = rtp.NewH264Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdH265 == track.Stream.CodecID {
muxer = rtp.NewH265Muxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == track.Stream.CodecID {
muxer = rtp.NewAACMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
} else if utils.AVCodecIdPCMALAW == track.Stream.CodecID || utils.AVCodecIdPCMMULAW == track.Stream.CodecID {
muxer = rtp.NewMuxer(payloadType.Pt, int(startSeq), 0xFFFFFFFF)
}
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType) // 创建RTP封装器
muxer, payload := newMuxerFunc.(func(seq int, ssrc uint32) (rtp.Muxer, rtp.PayloadType))(int(startSeq), 0xFFFFFFFF)
// 创建track
rtspTrack := NewRTSPTrack(muxer, payload, track.Stream.MediaType, track.Stream.CodecID)
t.RtspTracks = append(t.RtspTracks, rtspTrack) t.RtspTracks = append(t.RtspTracks, rtspTrack)
trackIndex := len(t.RtspTracks) - 1 trackIndex := len(t.RtspTracks) - 1
@@ -170,7 +158,7 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
} }
return nil return trackIndex, nil
} }
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) { func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
@@ -206,8 +194,7 @@ func (t *TransStream) WriteHeader() error {
}, },
} }
for i, track := range t.Tracks { for i, track := range t.RtspTracks {
payloadType, _ := rtp.CodecIdPayloads[track.Stream.CodecID]
mediaDescription := sdp.MediaDescription{ mediaDescription := sdp.MediaDescription{
ConnectionInformation: &sdp.ConnectionInformation{ ConnectionInformation: &sdp.ConnectionInformation{
NetworkType: "IN", NetworkType: "IN",
@@ -218,17 +205,17 @@ func (t *TransStream) WriteHeader() error {
Attributes: []sdp.Attribute{ Attributes: []sdp.Attribute{
sdp.NewAttribute("recvonly", ""), sdp.NewAttribute("recvonly", ""),
sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""), sdp.NewAttribute("control:"+fmt.Sprintf(t.urlFormat, i), ""),
sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", payloadType.Pt, payloadType.Encoding, payloadType.ClockRate), ""), sdp.NewAttribute(fmt.Sprintf("rtpmap:%d %s/%d", track.payload.Pt, track.payload.Encoding, track.payload.ClockRate), ""),
}, },
} }
mediaDescription.MediaName.Protos = []string{"RTP", "AVP"} mediaDescription.MediaName.Protos = []string{"RTP", "AVP"}
mediaDescription.MediaName.Formats = []string{strconv.Itoa(payloadType.Pt)} mediaDescription.MediaName.Formats = []string{strconv.Itoa(track.payload.Pt)}
if utils.AVMediaTypeAudio == track.Stream.MediaType { if utils.AVMediaTypeAudio == track.MediaType {
mediaDescription.MediaName.Media = "audio" mediaDescription.MediaName.Media = "audio"
if utils.AVCodecIdAAC == track.Stream.CodecID { if utils.AVCodecIdAAC == track.CodecID {
//[14496-3], [RFC6416] profile-level-id: //[14496-3], [RFC6416] profile-level-id:
//1 : Main Audio Profile Level 1 //1 : Main Audio Profile Level 1
//9 : Speech Audio Profile Level 1 //9 : Speech Audio Profile Level 1
@@ -267,7 +254,7 @@ func (t *TransStream) WriteHeader() error {
return nil return nil
} }
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream { func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[int]uint16) stream.TransStream {
t := &TransStream{ t := &TransStream{
addr: addr, addr: addr,
urlFormat: urlFormat, urlFormat: urlFormat,
@@ -286,7 +273,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) { func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
trackFormat := "?track=%d" trackFormat := "?track=%d"
var oldTracks map[byte]uint16 var oldTracks map[int]uint16
if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil { if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil {
oldTracks = endInfo.RtspTracks oldTracks = endInfo.RtspTracks
} }

View File

@@ -8,11 +8,11 @@ import (
// Track rtsp每路输出流的封装 // Track rtsp每路输出流的封装
type Track struct { type Track struct {
PT byte payload rtp.PayloadType
Rate int
MediaType utils.AVMediaType MediaType utils.AVMediaType
StartSeq uint16 StartSeq uint16
EndSeq uint16 EndSeq uint16
CodecID utils.AVCodecID
Muxer rtp.Muxer Muxer rtp.Muxer
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用 ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
@@ -21,12 +21,12 @@ type Track struct {
func (r *Track) Close() { func (r *Track) Close() {
} }
func NewRTSPTrack(muxer rtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track { func NewRTSPTrack(muxer rtp.Muxer, payload rtp.PayloadType, mediaType utils.AVMediaType, id utils.AVCodecID) *Track {
stream := &Track{ stream := &Track{
PT: pt, payload: payload,
Rate: rate,
Muxer: muxer, Muxer: muxer,
MediaType: mediaType, MediaType: mediaType,
CodecID: id,
} }
return stream return stream

View File

@@ -12,11 +12,7 @@ type RtpStream struct {
rtpBuffer *RtpBuffer rtpBuffer *RtpBuffer
} }
func (f *RtpStream) WriteHeader() error { func (f *RtpStream) Input(packet *avformat.AVPacket, _ int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
return nil
}
func (f *RtpStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
size := 2 + uint16(len(packet.Data)) size := 2 + uint16(len(packet.Data))
if size > UDPReceiveBufferSize { if size > UDPReceiveBufferSize {
log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize) log.Sugar.Errorf("转发%s流失败 rtp包过长, 长度:%d, 最大允许:%d", f.Protocol, len(packet.Data), UDPReceiveBufferSize)

View File

@@ -21,7 +21,7 @@ type StreamEndInfo struct {
Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳 Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳
M3U8Writer M3U8Writer // 保存M3U8生成器 M3U8Writer M3U8Writer // 保存M3U8生成器
PlaylistFormat *string // M3U8播放列表 PlaylistFormat *string // M3U8播放列表
RtspTracks map[byte]uint16 // rtsp每路track的结束序号 RtspTracks map[int]uint16 // rtsp每路track的结束序号
FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size
} }

View File

@@ -2,6 +2,7 @@ package stream
import ( import (
"fmt" "fmt"
"github.com/lkmio/avformat/utils"
) )
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error)
@@ -17,12 +18,15 @@ func init() {
transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8) transStreamFactories = make(map[TransStreamProtocol]TransStreamFactory, 8)
} }
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) { func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory, supportedCodes map[utils.AVCodecID]interface{}) {
_, ok := transStreamFactories[protocol] _, ok := transStreamFactories[protocol]
if ok { if ok {
panic(fmt.Sprintf("%s has been registered", protocol.String())) panic(fmt.Sprintf("%s has been registered", protocol.String()))
} else if _, ok = SupportedCodes[protocol]; ok {
panic(fmt.Sprintf("%s has been registered", protocol.String()))
} }
SupportedCodes[protocol] = supportedCodes
transStreamFactories[protocol] = streamFunc transStreamFactories[protocol] = streamFunc
} }

View File

@@ -1,6 +1,7 @@
package stream package stream
import ( import (
"fmt"
"github.com/lkmio/avformat" "github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils" "github.com/lkmio/avformat/utils"
@@ -190,7 +191,8 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
id := GenerateTransStreamID(TransStreamHls, streams...) id := GenerateTransStreamID(TransStreamHls, streams...)
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil) hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
if err != nil { if err != nil {
panic(err) log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error())
return
} }
t.DispatchGOPBuffer(hlsStream) t.DispatchGOPBuffer(hlsStream)
@@ -199,14 +201,6 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
} }
} }
func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool {
if TransStreamRtmp == protocol || TransStreamFlv == protocol {
}
return true
}
func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) { func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) {
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source) log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source)
@@ -214,16 +208,35 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
utils.Assert(source != nil) utils.Assert(source != nil)
transStream, err := CreateTransStream(source, protocol, tracks, sink) transStream, err := CreateTransStream(source, protocol, tracks, sink)
if err != nil { if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), t.source)
return nil, err return nil, err
} }
for _, track := range tracks { for _, track := range tracks {
supportedCodecs, ok := SupportedCodes[protocol]
if !ok {
panic(fmt.Sprintf("unknown protocol %s", protocol.String()))
}
_, ok = supportedCodecs[track.Stream.CodecID]
if !ok {
log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID)
continue
}
var index int
// 重新拷贝一个track传输流内部使用track的时间戳 // 重新拷贝一个track传输流内部使用track的时间戳
newTrack := *track newTrack := *track
if err = transStream.AddTrack(&newTrack); err != nil { if index, err = transStream.AddTrack(&newTrack); err != nil {
return nil, err log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID)
continue
} }
// stream index->muxer track index
transStream.SetMuxerTrack(index, &newTrack)
}
if transStream.TrackSize() == 0 {
return nil, fmt.Errorf("not found track")
} }
transStream.SetID(id) transStream.SetID(id)
@@ -238,7 +251,7 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
t.forwardTransStream = transStream t.forwardTransStream = transStream
} }
return transStream, err return transStream, nil
} }
func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) { func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
@@ -251,12 +264,17 @@ func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
// DispatchPacket 分发AVPacket // DispatchPacket 分发AVPacket
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) { func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) {
data, timestamp, videoKey, err := transStream.Input(packet) trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index)
if !ok {
return
}
data, timestamp, videoKey, err := transStream.Input(packet, trackIndex)
if err != nil || len(data) < 1 { if err != nil || len(data) < 1 {
return return
} }
t.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey) t.DispatchBuffer(transStream, trackIndex, data, timestamp, videoKey)
} }
// DispatchBuffer 分发传输流 // DispatchBuffer 分发传输流
@@ -321,7 +339,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
} }
// 不支持对期望编码的流封装. 降级 // 不支持对期望编码的流封装. 降级
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) { if utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId {
audioCodecId = utils.AVCodecIdNONE audioCodecId = utils.AVCodecIdNONE
videoCodecId = utils.AVCodecIdNONE videoCodecId = utils.AVCodecIdNONE
} }
@@ -370,14 +388,14 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
{ {
sink.Lock() sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.GetState() { if SessionStateClosed == sink.GetState() {
sink.UnLock()
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String()) log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
return false return false
} else { } else {
sink.SetState(SessionStateTransferring) sink.SetState(SessionStateTransferring)
} }
sink.UnLock()
} }
err := sink.StartStreaming(transStream) err := sink.StartStreaming(transStream)
@@ -408,13 +426,13 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
// 发送已有的缓存数据 // 发送已有的缓存数据
// 此处发送缓存数据必须要存在关键帧的输出流才发否则等DispatchPacket时再发送extra。 // 此处发送缓存数据必须要存在关键帧的输出流才发否则等DispatchPacket时再发送extra。
data, timestamp, _ := transStream.ReadKeyFrameBuffer() keyBuffer, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(data) > 0 { if len(keyBuffer) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
t.write(sink, 0, extraData, timestamp, false) t.write(sink, 0, extraData, timestamp, false)
} }
t.write(sink, 0, data, timestamp, true) t.write(sink, 0, keyBuffer, timestamp, true)
} }
// 新建传输流,发送已经缓存的音视频帧 // 新建传输流,发送已经缓存的音视频帧

View File

@@ -6,6 +6,10 @@ import (
"github.com/lkmio/avformat/utils" "github.com/lkmio/avformat/utils"
) )
var (
SupportedCodes = map[TransStreamProtocol]map[utils.AVCodecID]interface{}{}
)
// TransStream 将AVPacket封装成传输流 // TransStream 将AVPacket封装成传输流
type TransStream interface { type TransStream interface {
GetID() TransStreamID GetID() TransStreamID
@@ -13,14 +17,20 @@ type TransStream interface {
SetID(id TransStreamID) SetID(id TransStreamID)
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧 // Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) Input(packet *avformat.AVPacket, trackIndex int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error)
AddTrack(track *Track) error AddTrack(track *Track) (int, error)
SetMuxerTrack(muxerIndex int, track *Track)
FindMuxerTrackIndex(streamIndex int) (int, bool)
TrackSize() int TrackSize() int
GetTracks() []*Track GetTracks() []*Track
FindTrackWithStreamIndex(streamIndex int) *Track
// WriteHeader track添加完毕, 通过调用此函数告知 // WriteHeader track添加完毕, 通过调用此函数告知
WriteHeader() error WriteHeader() error
@@ -48,6 +58,7 @@ type TransStream interface {
type BaseTransStream struct { type BaseTransStream struct {
ID TransStreamID ID TransStreamID
Tracks []*Track Tracks []*Track
MuxerIndex map[int]int // stream index->muxer track index
Completed bool Completed bool
ExistVideo bool ExistVideo bool
Protocol TransStreamProtocol Protocol TransStreamProtocol
@@ -64,15 +75,47 @@ func (t *BaseTransStream) SetID(id TransStreamID) {
t.ID = id t.ID = id
} }
func (t *BaseTransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { func (t *BaseTransStream) WriteHeader() error {
return nil
}
func (t *BaseTransStream) Input(trackIndex, packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
return nil, -1, false, nil return nil, -1, false, nil
} }
func (t *BaseTransStream) AddTrack(track *Track) error { func (t *BaseTransStream) AddTrack(track *Track) (int, error) {
return len(t.Tracks), nil
}
func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) {
t.Tracks = append(t.Tracks, track) t.Tracks = append(t.Tracks, track)
if utils.AVMediaTypeVideo == track.Stream.MediaType { if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.ExistVideo = true t.ExistVideo = true
} }
if t.MuxerIndex == nil {
t.MuxerIndex = make(map[int]int)
}
// 如果muxerIndex为-1, 意味着复用器封装流时并不需要指定track index, 比如flv.
if muxerIndex > -1 {
t.MuxerIndex[track.Stream.Index] = muxerIndex
} else {
t.MuxerIndex[track.Stream.Index] = len(t.Tracks) - 1
}
}
func (t *BaseTransStream) FindMuxerTrackIndex(streamIndex int) (int, bool) {
index, ok := t.MuxerIndex[streamIndex]
return index, ok
}
func (t *BaseTransStream) FindTrackWithStreamIndex(streamIndex int) *Track {
for _, track := range t.Tracks {
if track.Stream.Index == streamIndex {
return track
}
}
return nil return nil
} }

View File

@@ -21,11 +21,12 @@ func init() {
int(utils.AVCodecIdVP8): 0x5, int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6, int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101, int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102, int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103, int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104, int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105, int(utils.AVCodecIdPCMMULAW): 105,
int(utils.AVCodecIdADPCMG722): 106,
} }
} }
@@ -58,7 +59,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
for i, track := range tracks { for i, track := range tracks {
id, ok := narrowCodecIds[int(track.Stream.CodecID)] id, ok := narrowCodecIds[int(track.Stream.CodecID)]
utils.Assert(ok) if ok {
id = byte(track.Stream.CodecID)
}
streamId |= uint64(id) << (48 - i*8) streamId |= uint64(id) << (48 - i*8)
} }