From 111d2121e2f9ef695a7495535d8a44df56e5b4cc Mon Sep 17 00:00:00 2001 From: ydajiang Date: Mon, 7 Jul 2025 09:15:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81pcm=E9=9F=B3=E9=A2=91?= =?UTF-8?q?=E8=BD=AC=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gb28181/publish_test.go | 15 ++++++-- stream/stream_publisher.go | 45 ++++++++++++++++++++-- stream/trans_utils.go | 4 ++ transcode/audio_transcoder.go | 71 +++++++++++++++++++++++++---------- 4 files changed, 109 insertions(+), 26 deletions(-) diff --git a/gb28181/publish_test.go b/gb28181/publish_test.go index 5d7de09..a7c534f 100644 --- a/gb28181/publish_test.go +++ b/gb28181/publish_test.go @@ -196,10 +196,19 @@ func modifySSRC(data []byte, ssrc uint32) { } // 使用wireshark直接导出的rtp流 -// 根据ssrc来查找每个rtp包, rtp不要带扩展字段 +// udp需要填写对应的ssrc来查找来分割rtp包 func TestPublish(t *testing.T) { - path := "../../source_files/gb28181_tcp_h264_pcma.raw" - var rawSsrc uint32 = 0xBEBC201 + //path := "../../source_files/rtp_ps_h264_pcm8k_0xBEBC206.raw" + //var rawSsrc uint32 = 0xBEBC206 + //path := "../../source_files/rtp_ps_h264_pcm16k_0xBEBC203.raw" + //var rawSsrc uint32 = 0xBEBC203 + + //path := "../../source_files/rtp_ps_h264_pcm32k_0xBEBC207.raw" + //var rawSsrc uint32 = 0xBEBC207 + + path := "../../source_files/rtp_ps_h264_G7221_0xBEBC204.raw" + var rawSsrc uint32 = 0xBEBC204 + localAddr := "0.0.0.0:20001" id := "hls_mystream" diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 10dbca0..18b564b 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -5,6 +5,7 @@ import ( "github.com/lkmio/avformat" "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" + "github.com/lkmio/flv" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/transcode" "github.com/lkmio/transport" @@ -214,12 +215,29 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t // 匹配和创建适合TransStream流协议的track var finalTracks []*Track for _, track := range tracks { + // 对应传输流支持的编码器列表 supportedCodecs, ok := SupportedCodes[protocol] if !ok { panic(fmt.Sprintf("unknown protocol %s", protocol.String())) } + // 是否支持该编码器 _, ok = supportedCodecs[track.Stream.CodecID] + + // 如果PCM采样率不符合FLV的标准, 也开启转码 + if ok && utils.AVCodecIdPCMS16LE == track.Stream.CodecID && (TransStreamRtmp == protocol || TransStreamFlv == protocol) { + for _, sampleRate := range flv.SupportedSampleRates { + ok = sampleRate == track.Stream.SampleRate + if ok { + break + } + } + + if !ok { + log.Sugar.Warnf("FLV不支持的PCM采样率 source: %s stream: %s sampleRate: %d", t.source, protocol.String(), track.Stream.SampleRate) + } + } + if !ok { log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID) // 尝试音频转码 @@ -227,11 +245,19 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t continue } - transcodeTrack := t.transcodeTracks[track.Stream.CodecID] + var transcodeTrack *TranscodeTrack + // 从已经存在的转码track中查找传输流支持的编码器 + for _, old := range t.transcodeTracks { + if _, ok = supportedCodecs[old.transcoder.GetEncoderID()]; ok { + transcodeTrack = old + break + } + } + if transcodeTrack == nil { // 创建音频转码器 var codecs []utils.AVCodecID - for codec := range SupportedCodes[protocol] { + for codec := range supportedCodecs { codecs = append(codecs, codec) } @@ -260,6 +286,8 @@ func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, t // 转码GOPBuffer中的音频 t.transcodeGOPBuffer(transcodeTrack) + } else { + log.Sugar.Infof("使用已经存在的音频转码track source: %s stream: %s src: %s dst: %s", t.source, protocol.String(), track.Stream.CodecID, transcodeTrack.transcoder.GetEncoderID()) } track = transcodeTrack.track @@ -459,7 +487,18 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { tracks = append(tracks, track) } - transStream, exist := t.transStreams[GenerateTransStreamID(sink.GetProtocol(), tracks...)] + var transStream TransStream + var exist bool + // 查找已经存在的传输流 + for _, stream := range t.transStreams { + if stream.GetID().Protocol() == sink.GetProtocol() { + transStream = stream + exist = true + break + } + } + + // 不存在创建新的传输流 if !exist { var err error transStream, err = t.CreateTransStream(sink.GetProtocol(), tracks, sink) diff --git a/stream/trans_utils.go b/stream/trans_utils.go index df6e7e7..c11e313 100644 --- a/stream/trans_utils.go +++ b/stream/trans_utils.go @@ -17,6 +17,10 @@ func (id TransStreamID) HasTrack(index int) bool { return false } +func (id TransStreamID) Protocol() TransStreamProtocol { + return TransStreamProtocol(id & 0xFF) +} + // GenerateTransStreamID 根据传入的推拉流协议和编码器ID生成StreamId // 请确保ids根据值升序排序传参 /*func GenerateTransStreamID(protocol GetProtocol, ids ...utils.AVCodecID) GetTransStreamID { diff --git a/transcode/audio_transcoder.go b/transcode/audio_transcoder.go index 822e48d..fc022aa 100644 --- a/transcode/audio_transcoder.go +++ b/transcode/audio_transcoder.go @@ -28,14 +28,21 @@ func (t *AudioTranscoder) Transcode(src *avformat.AVPacket, cb func([]byte, int) return 0, fmt.Errorf("unsupported media type: %s", src.MediaType.String()) } - pcmN, err := t.decoder.Decode(src.Data, t.pcmData) + var err error + pcmData := src.Data + pcmN := len(pcmData) + if t.decoder != nil { + pcmN, err = t.decoder.Decode(src.Data, t.pcmData) + pcmData = t.pcmData + } + if err != nil { return 0, err } else if pcmN < 1 { return 0, nil } - pktN, err := t.encoder.Encode(t.pcmData[:pcmN], func(bytes []byte) { + pktN, err := t.encoder.Encode(pcmData[:pcmN], func(bytes []byte) { cb(bytes, t.encoder.PacketDurationMS()) }) @@ -43,7 +50,9 @@ func (t *AudioTranscoder) Transcode(src *avformat.AVPacket, cb func([]byte, int) } func (t *AudioTranscoder) Close() { - t.decoder.Destroy() + if t.decoder != nil { + t.decoder.Destroy() + } t.encoder.Destroy() } @@ -52,13 +61,32 @@ func (t *AudioTranscoder) GetEncoderID() utils.AVCodecID { } func NewAudioTranscoder(src *avformat.AVStream, dst []utils.AVCodecID) (Transcoder, *avformat.AVStream, error) { - decoder := audio_transcoder.FindDecoder(src.CodecID.String()) - if decoder == nil { - return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String()) + var err error + var decoder audio_transcoder.Decoder + var encoder audio_transcoder.Encoder + + defer func() { + if err == nil { + return + } + + if decoder != nil { + decoder.Destroy() + } + if encoder != nil { + encoder.Destroy() + } + }() + + // 如果是pcm数据, 不需要解码 + if utils.AVCodecIdPCMS16LE != src.CodecID { + decoder = audio_transcoder.FindDecoder(src.CodecID.String()) + if decoder == nil { + return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String()) + } } - var err error - var encoder audio_transcoder.Encoder + // 查找合适的编码器 var dstCodec utils.AVCodecID for _, codec := range dst { encoder, err = audio_transcoder.FindEncoder(codec.String(), src.SampleRate, src.Channels) @@ -74,30 +102,33 @@ func NewAudioTranscoder(src *avformat.AVStream, dst []utils.AVCodecID) (Transcod return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String()) } - switch src.CodecID { - case utils.AVCodecIdAAC: - if err = decoder.(*audio_transcoder.AACDecoder).Create(nil, src.Data); err != nil { - return nil, nil, err + // 创建解码器 + if decoder != nil { + switch src.CodecID { + case utils.AVCodecIdAAC: + if err = decoder.(*audio_transcoder.AACDecoder).Create(nil, src.Data); err != nil { + return nil, nil, err + } + break + case utils.AVCodecIdOPUS: + if err = decoder.(*audio_transcoder.OpusDecoder).Create(src.SampleRate, src.Channels); err != nil { + return nil, nil, err + } + break } - break - case utils.AVCodecIdOPUS: - if err = decoder.(*audio_transcoder.OpusDecoder).Create(src.SampleRate, src.Channels); err != nil { - return nil, nil, err - } - break } + // aac编码是否添加adts头 adtsHeader := 1 + // 创建编码器 switch dstCodec { case utils.AVCodecIdAAC: if _, err = encoder.(*audio_transcoder.AACEncoder).Create(src.SampleRate, src.Channels, adtsHeader); err != nil { - decoder.Destroy() return nil, nil, err } break case utils.AVCodecIdOPUS: if _, err = encoder.(*audio_transcoder.OpusEncoder).Create(src.SampleRate, src.Channels); err != nil { - decoder.Destroy() return nil, nil, err } }