feat: 支持音频转码

This commit is contained in:
ydajiang
2025-07-03 17:27:45 +08:00
parent 7155b866c2
commit 67050bf9b9
15 changed files with 423 additions and 110 deletions

View File

@@ -121,7 +121,7 @@ func (t *TransStream) WriteHeader() error {
writeSeparator(t.flvHeaderBlock) writeSeparator(t.flvHeaderBlock)
writeSeparator(t.flvExtraDataBlock) writeSeparator(t.flvExtraDataBlock)
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) t.MWBuffer = stream.NewMergeWritingBuffer(t.HasVideo())
return nil return nil
} }

View File

@@ -198,7 +198,7 @@ func modifySSRC(data []byte, ssrc uint32) {
// 使用wireshark直接导出的rtp流 // 使用wireshark直接导出的rtp流
// 根据ssrc来查找每个rtp包, rtp不要带扩展字段 // 根据ssrc来查找每个rtp包, rtp不要带扩展字段
func TestPublish(t *testing.T) { func TestPublish(t *testing.T) {
path := "../../source_files/gb28181_h264.rtp" path := "../../source_files/gb28181_tcp_h264_pcma.raw"
var rawSsrc uint32 = 0xBEBC201 var rawSsrc uint32 = 0xBEBC201
localAddr := "0.0.0.0:20001" localAddr := "0.0.0.0:20001"
id := "hls_mystream" id := "hls_mystream"
@@ -319,12 +319,14 @@ func TestPublish(t *testing.T) {
func TestDecode(t *testing.T) { func TestDecode(t *testing.T) {
t.Run("decode_raw", func(t *testing.T) { t.Run("decode_raw", func(t *testing.T) {
file, err2 := os.ReadFile("../dump/gb28181-192.168.2.103.37841") file, err2 := os.ReadFile("../../source_files/gb28181-114.103.207.33.20872")
if err2 != nil { if err2 != nil {
panic(err2) panic(err2)
} }
filter := NewSingleFilter(NewPassiveSource()) source := NewPassiveSource()
source.Init()
filter := NewSingleFilter(source)
session := NewTCPSession(nil, filter) session := NewTCPSession(nil, filter)
reader := bufio.NewBytesReader(file) reader := bufio.NewBytesReader(file)

13
go.mod
View File

@@ -1,11 +1,12 @@
module github.com/lkmio/lkm module github.com/lkmio/lkm
require ( require (
github.com/lkmio/mpeg v0.0.0 github.com/lkmio/audio-transcoder v0.0.0-20250702123727-6c54138868c6
github.com/lkmio/flv v0.0.0 github.com/lkmio/flv v0.0.0
github.com/lkmio/mpeg v0.0.0
github.com/lkmio/rtmp v0.0.0 github.com/lkmio/rtmp v0.0.0
github.com/lkmio/transport v0.0.0
github.com/lkmio/rtp v0.0.0 github.com/lkmio/rtp v0.0.0
github.com/lkmio/transport v0.0.0
) )
require ( require (
@@ -14,13 +15,16 @@ require (
github.com/lkmio/avformat v0.0.0 github.com/lkmio/avformat v0.0.0
github.com/natefinch/lumberjack v2.0.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pion/interceptor v0.1.25 github.com/pion/interceptor v0.1.25
github.com/pion/rtcp v1.2.12
github.com/pion/rtp v1.8.5
github.com/pion/sdp/v3 v3.0.8
github.com/pion/webrtc/v3 v3.2.29 github.com/pion/webrtc/v3 v3.2.29
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
github.com/x-cray/logrus-prefixed-formatter v0.5.2 github.com/x-cray/logrus-prefixed-formatter v0.5.2
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
) )
require ( require (
github.com/BurntSushi/toml v1.3.2 // indirect github.com/BurntSushi/toml v1.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect github.com/google/uuid v1.3.1 // indirect
@@ -33,10 +37,7 @@ require (
github.com/pion/logging v0.2.2 // indirect github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.12 // indirect
github.com/pion/rtp v1.8.5 // indirect
github.com/pion/sctp v1.8.12 // indirect github.com/pion/sctp v1.8.12 // indirect
github.com/pion/sdp/v3 v3.0.8 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/stun v0.6.1 // indirect github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.3 // indirect github.com/pion/transport/v2 v2.2.3 // indirect

View File

@@ -43,7 +43,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collection
// 创建一下个切片 // 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片 // 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
var newSegment bool var newSegment bool
if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { if (!t.HasVideo() || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
// 保存当前切片文件 // 保存当前切片文件
if t.ctx.file != nil { if t.ctx.file != nil {
err := t.flushSegment(false) err := t.flushSegment(false)

View File

@@ -8,6 +8,7 @@ import (
"github.com/lkmio/lkm/jt1078" "github.com/lkmio/lkm/jt1078"
"github.com/lkmio/lkm/record" "github.com/lkmio/lkm/record"
"github.com/lkmio/lkm/rtsp" "github.com/lkmio/lkm/rtsp"
"github.com/lkmio/lkm/transcode"
"github.com/lkmio/mpeg" "github.com/lkmio/mpeg"
"github.com/lkmio/rtp" "github.com/lkmio/rtp"
"github.com/lkmio/transport" "github.com/lkmio/transport"
@@ -28,7 +29,7 @@ import (
func init() { func init() {
stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.TSSupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs)
stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs)
@@ -182,6 +183,10 @@ func main() {
}() }()
} }
if transcode.CreateAudioTranscoder != nil {
log.Sugar.Info("启用音频转码功能")
}
// 开启pprof调试 // 开启pprof调试
err := http.ListenAndServe(":19999", nil) err := http.ListenAndServe(":19999", nil)
if err != nil { if err != nil {

View File

@@ -219,7 +219,7 @@ func (t *transStream) WriteHeader() error {
n += size n += size
t.sequenceHeader = t.sequenceHeader[:n] t.sequenceHeader = t.sequenceHeader[:n]
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) t.MWBuffer = stream.NewMergeWritingBuffer(t.HasVideo())
return nil return nil
} }

View File

@@ -46,7 +46,7 @@ type mergeWritingBuffer struct {
hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧 hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧
hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧 hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧
existVideo bool // 是否存在视频 hasVideo bool // 是否存在视频
} }
func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) { func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) {
@@ -108,7 +108,7 @@ func (m *mergeWritingBuffer) FlushSegment() (*collections.ReferenceCounter[[]byt
counter := collections.NewReferenceCounter(data) counter := collections.NewReferenceCounter(data)
// 遇到完整关键帧切片, 替代前一组 // 遇到完整关键帧切片, 替代前一组
// 或者只保留最近的音频切片 // 或者只保留最近的音频切片
if m.hasKeyVideoDataInCurrentSegment || !m.existVideo { if m.hasKeyVideoDataInCurrentSegment || !m.hasVideo {
for m.lastKeyVideoDataSegments.Size() > 0 { for m.lastKeyVideoDataSegments.Size() > 0 {
segment := m.lastKeyVideoDataSegments.Pop() segment := m.lastKeyVideoDataSegments.Pop()
segment.Release() segment.Release()
@@ -185,9 +185,9 @@ func (m *mergeWritingBuffer) Close() *collections.Queue[*mbBuffer] {
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
buffer := &mergeWritingBuffer{ buffer := &mergeWritingBuffer{
startTS: -1, startTS: -1,
existVideo: existVideo, hasVideo: existVideo,
buffers: collections.NewQueue[*mbBuffer](24), buffers: collections.NewQueue[*mbBuffer](24),
} }
if AppConfig.GOPCache { if AppConfig.GOPCache {

View File

@@ -26,9 +26,9 @@ type StreamEndInfo struct {
} }
func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool { func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool {
if len(info.Timestamps) != len(tracks) { //if len(info.Timestamps) != len(tracks) {
return false // return false
} //}
for _, track := range tracks { for _, track := range tracks {
if _, ok := info.Timestamps[track.Stream.CodecID]; !ok { if _, ok := info.Timestamps[track.Stream.CodecID]; !ok {

View File

@@ -72,29 +72,27 @@ type transStreamPublisher struct {
streamEvents *NonBlockingChannel[*StreamEvent] streamEvents *NonBlockingChannel[*StreamEvent]
mainContextEvents chan func() mainContextEvents chan func()
sinkCount int sinkCount int // 拉流计数
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop
recordSink Sink // 每个Source的录制流 recordSink Sink // 每个Source的录制流
recordFilePath string // 录制流文件路径 recordFilePath string // 录制流文件路径
hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟. hlsStream TransStream // HLS传输流
_ []transcode.Transcoder // 音频解码器 originTracks TrackManager // 推流的原始track
_ []transcode.Transcoder // 视频解码器 transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track
originTracks TrackManager // 推流的音视频Streams
allStreamTracks TrackManager // 推流Streams+转码器获得的Stream
transStreams map[TransStreamID]TransStream // 所有输出流 transStreams map[TransStreamID]TransStream // 所有输出流
forwardTransStream TransStream // 转发流 forwardTransStream TransStream // 转发流
sinks map[SinkID]Sink // 保存所有Sink sinks map[SinkID]Sink // 所有拉流Sink
transStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink transStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
existVideo bool // 是否存在视频 hasVideo bool // 是否存在视频
completed atomic.Bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true completed atomic.Bool // 推流track是否解析完毕
closed atomic.Bool closed atomic.Bool
streamEndInfo *StreamEndInfo // 之前推流源信息 streamEndInfo *StreamEndInfo // 之前推流源信息
accumulateTimestamps bool // 是否累加时间戳 accumulateTimestamps bool // 是否累加时间戳
timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳 timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳
lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 lastStreamEndTime time.Time // 最近结束拉流的时间
bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区 bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区
} }
@@ -109,6 +107,7 @@ func (t *transStreamPublisher) run() {
t.transStreams = make(map[TransStreamID]TransStream, 10) t.transStreams = make(map[TransStreamID]TransStream, 10)
t.sinks = make(map[SinkID]Sink, 128) t.sinks = make(map[SinkID]Sink, 128)
t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4)
defer func() { defer func() {
// 清空管道 // 清空管道
@@ -137,7 +136,7 @@ func (t *transStreamPublisher) run() {
if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascaded { if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascaded {
packets := event.Data.([][]byte) packets := event.Data.([][]byte)
for _, data := range packets { for _, data := range packets {
t.DispatchPacket(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]}) t.DispatchPacketToStream(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]})
UDPReceiveBufferPool.Put(data[:cap(data)]) UDPReceiveBufferPool.Put(data[:cap(data)])
} }
} }
@@ -188,8 +187,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
streams := t.originTracks.All() streams := t.originTracks.All()
utils.Assert(len(streams) > 0) utils.Assert(len(streams) > 0)
id := GenerateTransStreamID(TransStreamHls, streams...) hlsStream, err := t.CreateTransStream(TransStreamHls, streams, nil)
hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil)
if err != nil { if err != nil {
log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error()) log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error())
return return
@@ -197,20 +195,24 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() {
t.DispatchGOPBuffer(hlsStream) t.DispatchGOPBuffer(hlsStream)
t.hlsStream = hlsStream t.hlsStream = hlsStream
t.transStreams[id] = t.hlsStream
} }
} }
func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) { // 转码GOPBuffer中的音频
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source) func (t *transStreamPublisher) transcodeGOPBuffer(track *TranscodeTrack) {
if t.gopBuffer != nil {
source := SourceManager.Find(t.source) t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) {
utils.Assert(source != nil) if utils.AVMediaTypeAudio != packet.Get().MediaType {
transStream, err := CreateTransStream(source, protocol, tracks, sink) return
if err != nil { }
return nil, err track.Input(packet.Get())
})
} }
}
func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) {
// 匹配和创建适合TransStream流协议的track
var finalTracks []*Track
for _, track := range tracks { for _, track := range tracks {
supportedCodecs, ok := SupportedCodes[protocol] supportedCodecs, ok := SupportedCodes[protocol]
if !ok { if !ok {
@@ -220,28 +222,94 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
_, ok = supportedCodecs[track.Stream.CodecID] _, ok = supportedCodecs[track.Stream.CodecID]
if !ok { if !ok {
log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID) log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID)
continue // 尝试音频转码
if utils.AVMediaTypeAudio != track.Stream.MediaType || transcode.CreateAudioTranscoder == nil {
continue
}
transcodeTrack := t.transcodeTracks[track.Stream.CodecID]
if transcodeTrack == nil {
// 创建音频转码器
var codecs []utils.AVCodecID
for codec := range SupportedCodes[protocol] {
codecs = append(codecs, codec)
}
transcoder, stream, err := transcode.CreateAudioTranscoder(track.Stream, codecs)
if err != nil {
log.Sugar.Errorf("创建音频转码器失败 source: %s stream: %s codec: %s err: %s", t.source, protocol.String(), track.Stream.CodecID, err.Error())
continue
}
log.Sugar.Infof("创建音频转码器成功 source: %s stream: %s src: %s dst: %s", t.source, protocol.String(), track.Stream.CodecID, transcoder.GetEncoderID())
stream.Index = len(t.originTracks.tracks) + len(t.transcodeTracks)
newTrack := &Track{Stream: stream}
// 如果之前有转码过, 则使用之前的时间戳
if t.streamEndInfo != nil {
oldTimestamps, ok := t.streamEndInfo.Timestamps[transcoder.GetEncoderID()]
if ok {
newTrack.Dts = oldTimestamps[0]
newTrack.Pts = oldTimestamps[1]
}
}
transcodeTrack = NewTranscodeTrack(newTrack, transcoder)
t.transcodeTracks[track.Stream.CodecID] = transcodeTrack
// 转码GOPBuffer中的音频
t.transcodeGOPBuffer(transcodeTrack)
}
track = transcodeTrack.track
} }
var index int
// 重新拷贝一个track传输流内部使用track的时间戳 // 重新拷贝一个track传输流内部使用track的时间戳
newTrack := *track newTrack := *track
if index, err = transStream.AddTrack(&newTrack); err != nil { finalTracks = append(finalTracks, &newTrack)
}
if len(finalTracks) < 1 {
return nil, fmt.Errorf("not found track")
}
id := GenerateTransStreamID(protocol, finalTracks...)
// 如果已经存在该id的输出流, 则直接返回
oldTransStream := t.transStreams[id]
if oldTransStream != nil {
return oldTransStream, nil
}
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source)
source := SourceManager.Find(t.source)
utils.Assert(source != nil)
transStream, err := CreateTransStream(source, protocol, tracks, sink)
if err != nil {
return nil, err
}
for _, track := range finalTracks {
index, err := transStream.AddTrack(track)
if err != nil {
log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID) log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID)
continue continue
} }
// stream index->muxer track index // stream index->muxer track index
transStream.SetMuxerTrack(index, &newTrack) transStream.SetMuxerTrack(index, track)
} }
if transStream.TrackSize() == 0 { if transStream.TrackSize() == 0 {
return nil, fmt.Errorf("not found track") return nil, fmt.Errorf("not found track")
} }
id = GenerateTransStreamID(protocol, transStream.GetTracks()...)
transStream.SetID(id) transStream.SetID(id)
transStream.SetProtocol(protocol) transStream.SetProtocol(protocol)
t.transStreams[id] = transStream
// 创建输出流对应的拉流队列 // 创建输出流对应的拉流队列
t.transStreamSinks[id] = make(map[SinkID]Sink, 128) t.transStreamSinks[id] = make(map[SinkID]Sink, 128)
_ = transStream.WriteHeader() _ = transStream.WriteHeader()
@@ -257,16 +325,36 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran
func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) { func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) {
if t.gopBuffer != nil { if t.gopBuffer != nil {
t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) {
t.DispatchPacket(transStream, packet.Get()) t.DispatchPacketToStream(transStream, packet.Get())
}) })
// 发送转码包
for _, track := range t.transcodeTracks {
size := track.packets.Size()
for i := 0; i < size; i++ {
t.DispatchPacketToStream(transStream, track.packets.Peek(i))
}
}
} }
} }
// DispatchPacket 分发AVPacket func (t *transStreamPublisher) DispatchPacket(packet *avformat.AVPacket) {
func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) { for _, transStream := range t.transStreams {
if TransStreamGBCascaded == transStream.GetProtocol() {
continue
}
t.DispatchPacketToStream(transStream, packet)
}
}
// DispatchPacketToStream 分发AVPacket
func (t *transStreamPublisher) DispatchPacketToStream(transStream TransStream, packet *avformat.AVPacket) {
trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index) trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index)
if !ok { if !ok {
return return
} else if !transStream.GetID().HasTrack(packet.Index) {
return
} }
data, timestamp, videoKey, err := transStream.Input(packet, trackIndex) data, timestamp, videoKey, err := transStream.Input(packet, trackIndex)
@@ -280,7 +368,7 @@ func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *a
// DispatchBuffer 分发传输流 // DispatchBuffer 分发传输流
func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) {
sinks := t.transStreamSinks[transStream.GetID()] sinks := t.transStreamSinks[transStream.GetID()]
exist := transStream.IsExistVideo() exist := transStream.HasVideo()
for _, sink := range sinks { for _, sink := range sinks {
@@ -371,20 +459,17 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
tracks = append(tracks, track) tracks = append(tracks, track)
} }
transStreamId := GenerateTransStreamID(sink.GetProtocol(), tracks...) transStream, exist := t.transStreams[GenerateTransStreamID(sink.GetProtocol(), tracks...)]
transStream, exist := t.transStreams[transStreamId]
if !exist { if !exist {
var err error var err error
transStream, err = t.CreateTransStream(transStreamId, sink.GetProtocol(), tracks, sink) transStream, err = t.CreateTransStream(sink.GetProtocol(), tracks, sink)
if err != nil { if err != nil {
log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), t.source) log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), t.source)
return false return false
} }
t.transStreams[transStreamId] = transStream
} }
sink.SetTransStreamID(transStreamId) sink.SetTransStreamID(transStream.GetID())
{ {
sink.Lock() sink.Lock()
@@ -398,6 +483,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
sink.UnLock() sink.UnLock()
} }
// 开始推流
err := sink.StartStreaming(transStream) err := sink.StartStreaming(transStream)
if err != nil { if err != nil {
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkID2String(sink.GetID()), t.source) log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkID2String(sink.GetID()), t.source)
@@ -416,7 +502,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
} }
t.sinks[sink.GetID()] = sink t.sinks[sink.GetID()] = sink
t.transStreamSinks[transStreamId][sink.GetID()] = sink t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
_, ok := sink.GetConn().(*transport.Conn) _, ok := sink.GetConn().(*transport.Conn)
@@ -436,7 +522,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool {
} }
// 新建传输流,发送已经缓存的音视频帧 // 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && t.existVideo && TransStreamGBCascaded != transStream.GetProtocol() { if !exist && AppConfig.GOPCache && t.hasVideo && TransStreamGBCascaded != transStream.GetProtocol() {
t.DispatchGOPBuffer(transStream) t.DispatchGOPBuffer(transStream)
} }
@@ -516,6 +602,11 @@ func (t *transStreamPublisher) doClose() {
t.gopBuffer = nil t.gopBuffer = nil
} }
// 关闭转码器
for _, track := range t.transcodeTracks {
track.Close()
}
// 关闭录制流 // 关闭录制流
if t.recordSink != nil { if t.recordSink != nil {
t.recordSink.Close() t.recordSink.Close()
@@ -523,7 +614,16 @@ func (t *transStreamPublisher) doClose() {
// 保留推流信息 // 保留推流信息
if t.sinkCount > 0 && len(t.originTracks.All()) > 0 { if t.sinkCount > 0 && len(t.originTracks.All()) > 0 {
sourceHistory := StreamEndInfoBride(t.source, t.originTracks.All(), t.transStreams) var tracks []*Track
for _, track := range t.originTracks.All() {
tracks = append(tracks, track)
}
for _, track := range t.transcodeTracks {
tracks = append(tracks, track.track)
}
sourceHistory := StreamEndInfoBride(t.source, tracks, t.transStreams)
streamEndInfoManager.Add(sourceHistory) streamEndInfoManager.Add(sourceHistory)
} }
@@ -582,8 +682,7 @@ func (t *transStreamPublisher) WriteHeader() {
t.streamEndInfo = streamInfo t.streamEndInfo = streamInfo
// 恢复每路track的时间戳 // 恢复每路track的时间戳
tracks := t.originTracks.All() for _, track := range t.originTracks.All() {
for _, track := range tracks {
timestamps := streamInfo.Timestamps[track.Stream.CodecID] timestamps := streamInfo.Timestamps[track.Stream.CodecID]
track.Dts = timestamps[0] track.Dts = timestamps[0]
track.Pts = timestamps[1] track.Pts = timestamps[1]
@@ -613,7 +712,7 @@ func (t *transStreamPublisher) WriteHeader() {
} }
// 如果不存在视频帧, 清空GOP缓存 // 如果不存在视频帧, 清空GOP缓存
if !t.existVideo { if !t.hasVideo {
t.ClearGopBuffer(false) t.ClearGopBuffer(false)
t.gopBuffer = nil t.gopBuffer = nil
} }
@@ -643,11 +742,16 @@ func (t *transStreamPublisher) ClearGopBuffer(free bool) {
t.bitstreamFilterBuffer.Pop() t.bitstreamFilterBuffer.Pop()
} }
}) })
// 丢弃转码track中的缓存
for _, track := range t.transcodeTracks {
track.Clear()
}
} }
func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) { func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) {
// 保存到GOP缓存 // 保存到GOP缓存
if (AppConfig.GOPCache && t.existVideo) || !t.completed.Load() { if (AppConfig.GOPCache && t.hasVideo) || !t.completed.Load() {
packet.Get().OnBufferAlloc = func(size int) []byte { packet.Get().OnBufferAlloc = func(size int) []byte {
if t.bitstreamFilterBuffer == nil { if t.bitstreamFilterBuffer == nil {
t.bitstreamFilterBuffer = collections.NewRBBlockBuffer(1024 * 1024 * 2) t.bitstreamFilterBuffer = collections.NewRBBlockBuffer(1024 * 1024 * 2)
@@ -669,14 +773,18 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av
t.CorrectTimestamp(packet.Get()) t.CorrectTimestamp(packet.Get())
// 分发给各个传输流 // 分发给各个传输流
for _, transStream := range t.transStreams { t.DispatchPacket(packet.Get())
if TransStreamGBCascaded != transStream.GetProtocol() {
t.DispatchPacket(transStream, packet.Get()) // 转码
for _, track := range t.transcodeTracks {
transcodePackets := track.Input(packet.Get())
for _, transcodePkt := range transcodePackets {
t.DispatchPacket(transcodePkt)
} }
} }
// 未开启GOP缓存或只存在音频流, 立即释放 // 未开启GOP缓存或只存在音频流, 立即释放
if !AppConfig.GOPCache || !t.existVideo { if !AppConfig.GOPCache || !t.hasVideo {
packet.Release() packet.Release()
} }
} }
@@ -687,7 +795,7 @@ func (t *transStreamPublisher) OnNewTrack(track *Track) {
t.originTracks.Add(track) t.originTracks.Add(track)
if utils.AVMediaTypeVideo == stream.MediaType { if utils.AVMediaTypeVideo == stream.MediaType {
t.existVideo = true t.hasVideo = true
} }
// 创建GOPBuffer // 创建GOPBuffer
@@ -704,10 +812,13 @@ func (t *transStreamPublisher) CorrectTimestamp(packet *avformat.AVPacket) {
timestamps := t.streamEndInfo.Timestamps[packet.CodecID] timestamps := t.streamEndInfo.Timestamps[packet.CodecID]
t.accumulateTimestamps = true t.accumulateTimestamps = true
log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1]) log.Sugar.Infof("使用上次推流的时间戳 dts: %d, pts: %d", timestamps[0], timestamps[1])
} }
track := t.originTracks.Find(packet.CodecID) track := t.originTracks.Find(packet.CodecID)
if track == nil {
return
}
duration := packet.GetDuration(packet.Timebase) duration := packet.GetDuration(packet.Timebase)
// 根据duration来累加时间戳 // 根据duration来累加时间戳
@@ -731,7 +842,7 @@ func (t *transStreamPublisher) GetStreamEndInfo() *StreamEndInfo {
} }
func (t *transStreamPublisher) TranscodeTracks() []*Track { func (t *transStreamPublisher) TranscodeTracks() []*Track {
return t.allStreamTracks.All() return t.originTracks.All()
} }
func (t *transStreamPublisher) LastStreamEndTime() time.Time { func (t *transStreamPublisher) LastStreamEndTime() time.Time {

View File

@@ -48,7 +48,7 @@ type TransStream interface {
// Close 关闭传输流, 返回还未flush的合并写块 // Close 关闭传输流, 返回还未flush的合并写块
Close() ([]*collections.ReferenceCounter[[]byte], int64, error) Close() ([]*collections.ReferenceCounter[[]byte], int64, error)
IsExistVideo() bool HasVideo() bool
IsTCPStreaming() bool IsTCPStreaming() bool
@@ -60,7 +60,7 @@ type BaseTransStream struct {
Tracks []*Track Tracks []*Track
MuxerIndex map[int]int // stream index->muxer track index MuxerIndex map[int]int // stream index->muxer track index
Completed bool Completed bool
ExistVideo bool hasVideo bool
Protocol TransStreamProtocol Protocol TransStreamProtocol
OutBuffer []*collections.ReferenceCounter[[]byte] // 传输流的合并写块队列 OutBuffer []*collections.ReferenceCounter[[]byte] // 传输流的合并写块队列
@@ -90,7 +90,7 @@ func (t *BaseTransStream) AddTrack(track *Track) (int, error) {
func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) { func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) {
t.Tracks = append(t.Tracks, track) t.Tracks = append(t.Tracks, track)
if utils.AVMediaTypeVideo == track.Stream.MediaType { if utils.AVMediaTypeVideo == track.Stream.MediaType {
t.ExistVideo = true t.hasVideo = true
} }
if t.MuxerIndex == nil { if t.MuxerIndex == nil {
@@ -159,8 +159,8 @@ func (t *BaseTransStream) GetTracks() []*Track {
return t.Tracks return t.Tracks
} }
func (t *BaseTransStream) IsExistVideo() bool { func (t *BaseTransStream) HasVideo() bool {
return t.ExistVideo return t.hasVideo
} }
func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) {

View File

@@ -2,32 +2,19 @@ package stream
import "github.com/lkmio/avformat/utils" import "github.com/lkmio/avformat/utils"
// TransStreamID 每个传输流的唯一Id根据输出流协议ID+流包含的音视频编码器ID生成 // TransStreamID 每个传输流的唯一Id根据输出流协议ID+track index生成
// 输出流协议ID占用高8位 // 输出流协议占低8位
// 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流. // 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流.
type TransStreamID uint64 type TransStreamID uint64
var ( func (id TransStreamID) HasTrack(index int) bool {
// AVCodecID转为byte的对应关系 for i := 1; i < 8; i++ {
narrowCodecIds map[int]byte if int(id>>(i*8))&0xFF == index {
) return true
}
func init() {
narrowCodecIds = map[int]byte{
int(utils.AVCodecIdH263): 0x1,
int(utils.AVCodecIdH264): 0x2,
int(utils.AVCodecIdH265): 0x3,
int(utils.AVCodecIdAV1): 0x4,
int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105,
int(utils.AVCodecIdADPCMG722): 106,
} }
return false
} }
// GenerateTransStreamID 根据传入的推拉流协议和编码器ID生成StreamId // GenerateTransStreamID 根据传入的推拉流协议和编码器ID生成StreamId
@@ -54,16 +41,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans
len_ := len(tracks) len_ := len(tracks)
utils.Assert(len_ > 0 && len_ < 8) utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64 var streamId = uint64(protocol) & 0xFF
streamId = uint64(protocol) << 56
for i, track := range tracks { for i, track := range tracks {
id, ok := narrowCodecIds[int(track.Stream.CodecID)] streamId |= uint64(track.Stream.Index) << ((i + 1) * 8)
if ok {
id = byte(track.Stream.CodecID)
}
streamId |= uint64(id) << (48 - i*8)
} }
return TransStreamID(streamId) return TransStreamID(streamId)

74
stream/transcode_track.go Normal file
View File

@@ -0,0 +1,74 @@
package stream
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/transcode"
"sync"
)
var (
AudioPacketDataPool = sync.Pool{
New: func() interface{} {
return make([]byte, 48000)
},
}
)
type TranscodeTrack struct {
track *Track
packets *collections.Queue[*avformat.AVPacket]
transcoder transcode.Transcoder
result []*avformat.AVPacket
dst int64
pts int64
}
func (t *TranscodeTrack) Input(packet *avformat.AVPacket) []*avformat.AVPacket {
t.result = t.result[:0]
_, _ = t.transcoder.Transcode(packet, func(bytes []byte, duration int) {
dstPkt := avformat.PacketPool.Get().(*avformat.AVPacket)
*dstPkt = *packet
data := AudioPacketDataPool.Get().([]byte)
copy(data, bytes)
dstPkt.Data = data[:len(bytes)]
dstPkt.Timebase = 1000
dstPkt.Pts = t.pts
dstPkt.Dts = t.pts
dstPkt.Duration = int64(duration)
dstPkt.CodecID = t.track.Stream.CodecID
dstPkt.Index = t.track.Stream.Index
t.pts += dstPkt.Duration
t.dst += dstPkt.Duration
t.packets.Push(dstPkt)
t.result = append(t.result, dstPkt)
})
return t.result
}
func (t *TranscodeTrack) Clear() {
for t.packets.Size() > 0 {
packet := t.packets.Pop()
AudioPacketDataPool.Put(packet.Data[:cap(packet.Data)])
avformat.FreePacket(packet)
}
}
func (t *TranscodeTrack) Close() {
t.transcoder.Close()
t.Clear()
}
func NewTranscodeTrack(track *Track, transcoder transcode.Transcoder) *TranscodeTrack {
return &TranscodeTrack{
track: track,
transcoder: transcoder,
packets: collections.NewQueue[*avformat.AVPacket](128),
}
}

View File

@@ -0,0 +1,126 @@
//go:build audio_transcode
// +build audio_transcode
package transcode
import (
"fmt"
audio_transcoder "github.com/lkmio/audio-transcoder"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
)
func init() {
CreateAudioTranscoder = NewAudioTranscoder
}
type AudioTranscoder struct {
decoder audio_transcoder.Decoder
encoder audio_transcoder.Encoder
encoderID utils.AVCodecID
// reSampler audio_transcoder.Resampler
pcmData []byte
pktData []byte
}
func (t *AudioTranscoder) Transcode(src *avformat.AVPacket, cb func([]byte, int)) (int, error) {
if src.MediaType != utils.AVMediaTypeAudio {
return 0, fmt.Errorf("unsupported media type: %s", src.MediaType.String())
}
pcmN, err := t.decoder.Decode(src.Data, t.pcmData)
if err != nil {
return 0, err
} else if pcmN < 1 {
return 0, nil
}
pktN, err := t.encoder.Encode(t.pcmData[:pcmN], func(bytes []byte) {
cb(bytes, t.encoder.PacketDurationMS())
})
return pktN, nil
}
func (t *AudioTranscoder) Close() {
t.decoder.Destroy()
t.encoder.Destroy()
}
func (t *AudioTranscoder) GetEncoderID() utils.AVCodecID {
return t.encoderID
}
func NewAudioTranscoder(src *avformat.AVStream, dst []utils.AVCodecID) (Transcoder, *avformat.AVStream, error) {
decoder := audio_transcoder.FindDecoder(src.CodecID.String())
if decoder == nil {
return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String())
}
var err error
var encoder audio_transcoder.Encoder
var dstCodec utils.AVCodecID
for _, codec := range dst {
encoder, err = audio_transcoder.FindEncoder(codec.String(), src.SampleRate, src.Channels)
if encoder != nil {
dstCodec = codec
break
}
}
if err != nil {
return nil, nil, err
} else if encoder == nil {
return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String())
}
switch src.CodecID {
case utils.AVCodecIdAAC:
if err = decoder.(*audio_transcoder.AACDecoder).Create(nil, src.Data); err != nil {
return nil, nil, err
}
break
case utils.AVCodecIdOPUS:
if err = decoder.(*audio_transcoder.OpusDecoder).Create(src.SampleRate, src.Channels); err != nil {
return nil, nil, err
}
break
}
adtsHeader := 1
switch dstCodec {
case utils.AVCodecIdAAC:
if _, err = encoder.(*audio_transcoder.AACEncoder).Create(src.SampleRate, src.Channels, adtsHeader); err != nil {
decoder.Destroy()
return nil, nil, err
}
break
case utils.AVCodecIdOPUS:
if _, err = encoder.(*audio_transcoder.OpusEncoder).Create(src.SampleRate, src.Channels); err != nil {
decoder.Destroy()
return nil, nil, err
}
}
dstStream := &avformat.AVStream{}
*dstStream = *src
dstStream.CodecID = dstCodec
dstStream.Timebase = 1000
if data := encoder.ExtraData(); data != nil {
dstStream.Data = make([]byte, len(data))
copy(dstStream.Data, data)
}
if utils.AVCodecIdAAC == dstCodec {
dstStream.HasADTSHeader = adtsHeader == 1
}
return &AudioTranscoder{
decoder: decoder,
encoder: encoder,
pcmData: make([]byte, src.SampleRate*src.Channels*2),
pktData: make([]byte, src.SampleRate*src.Channels*2),
encoderID: dstCodec,
}, dstStream, nil
}

View File

@@ -1,4 +1,18 @@
package transcode package transcode
import (
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/utils"
)
var (
CreateAudioTranscoder func(src *avformat.AVStream, dst []utils.AVCodecID) (Transcoder, *avformat.AVStream, error)
)
type Transcoder interface { type Transcoder interface {
Transcode(src *avformat.AVPacket, cb func([]byte, int)) (int, error)
GetEncoderID() utils.AVCodecID
Close()
} }

View File

@@ -50,7 +50,7 @@
let remote_view = document.getElementById("videoview"); let remote_view = document.getElementById("videoview");
let source = document.getElementById("source").value; let source = document.getElementById("source").value;
let pc = new RTCPeerConnection(null); let pc = new RTCPeerConnection(null);
// pc.addTransceiver("audio", {direction: "recvonly"}); pc.addTransceiver("audio", {direction: "recvonly"});
pc.addTransceiver("video", {direction: "recvonly"}); pc.addTransceiver("video", {direction: "recvonly"});
let offer = await pc.createOffer(); let offer = await pc.createOffer();