diff --git a/flv/http_flv.go b/flv/http_flv.go index 4e3bd9f..d84933a 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -121,7 +121,7 @@ func (t *TransStream) WriteHeader() error { writeSeparator(t.flvHeaderBlock) writeSeparator(t.flvExtraDataBlock) - t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) + t.MWBuffer = stream.NewMergeWritingBuffer(t.HasVideo()) return nil } diff --git a/gb28181/publish_test.go b/gb28181/publish_test.go index 6196c80..5d7de09 100644 --- a/gb28181/publish_test.go +++ b/gb28181/publish_test.go @@ -198,7 +198,7 @@ func modifySSRC(data []byte, ssrc uint32) { // 使用wireshark直接导出的rtp流 // 根据ssrc来查找每个rtp包, rtp不要带扩展字段 func TestPublish(t *testing.T) { - path := "../../source_files/gb28181_h264.rtp" + path := "../../source_files/gb28181_tcp_h264_pcma.raw" var rawSsrc uint32 = 0xBEBC201 localAddr := "0.0.0.0:20001" id := "hls_mystream" @@ -319,12 +319,14 @@ func TestPublish(t *testing.T) { func TestDecode(t *testing.T) { t.Run("decode_raw", func(t *testing.T) { - file, err2 := os.ReadFile("../dump/gb28181-192.168.2.103.37841") + file, err2 := os.ReadFile("../../source_files/gb28181-114.103.207.33.20872") if err2 != nil { panic(err2) } - filter := NewSingleFilter(NewPassiveSource()) + source := NewPassiveSource() + source.Init() + filter := NewSingleFilter(source) session := NewTCPSession(nil, filter) reader := bufio.NewBytesReader(file) diff --git a/go.mod b/go.mod index dab1c91..708f272 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module github.com/lkmio/lkm require ( - github.com/lkmio/mpeg v0.0.0 + github.com/lkmio/audio-transcoder v0.0.0-20250702123727-6c54138868c6 github.com/lkmio/flv v0.0.0 + github.com/lkmio/mpeg v0.0.0 github.com/lkmio/rtmp v0.0.0 - github.com/lkmio/transport v0.0.0 github.com/lkmio/rtp v0.0.0 + github.com/lkmio/transport v0.0.0 ) require ( @@ -14,13 +15,16 @@ require ( github.com/lkmio/avformat v0.0.0 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/pion/interceptor v0.1.25 + github.com/pion/rtcp v1.2.12 + github.com/pion/rtp v1.8.5 + github.com/pion/sdp/v3 v3.0.8 github.com/pion/webrtc/v3 v3.2.29 github.com/sirupsen/logrus v1.9.3 github.com/x-cray/logrus-prefixed-formatter v0.5.2 go.uber.org/zap v1.27.0 ) -require ( +require ( github.com/BurntSushi/toml v1.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.3.1 // indirect @@ -33,10 +37,7 @@ require ( github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.12 // indirect - github.com/pion/rtp v1.8.5 // indirect github.com/pion/sctp v1.8.12 // indirect - github.com/pion/sdp/v3 v3.0.8 // indirect github.com/pion/srtp/v2 v2.0.18 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pion/transport/v2 v2.2.3 // indirect diff --git a/hls/hls_stream.go b/hls/hls_stream.go index a91a01a..3321690 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -43,7 +43,7 @@ func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collection // 创建一下个切片 // 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片 var newSegment bool - if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { + if (!t.HasVideo() || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { // 保存当前切片文件 if t.ctx.file != nil { err := t.flushSegment(false) diff --git a/main.go b/main.go index e14a8a8..ae2e3b9 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/lkmio/lkm/jt1078" "github.com/lkmio/lkm/record" "github.com/lkmio/lkm/rtsp" + "github.com/lkmio/lkm/transcode" "github.com/lkmio/mpeg" "github.com/lkmio/rtp" "github.com/lkmio/transport" @@ -28,7 +29,7 @@ import ( func init() { stream.RegisterTransStreamFactory(stream.TransStreamRtmp, rtmp.TransStreamFactory, flv2.SupportedCodecs) - stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.SupportedCodecs) + stream.RegisterTransStreamFactory(stream.TransStreamHls, hls.TransStreamFactory, mpeg.TSSupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamFlv, flv.TransStreamFactory, flv2.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamRtsp, rtsp.TransStreamFactory, rtp.SupportedCodecs) stream.RegisterTransStreamFactory(stream.TransStreamRtc, rtc.TransStreamFactory, rtc.SupportedCodecs) @@ -182,6 +183,10 @@ func main() { }() } + if transcode.CreateAudioTranscoder != nil { + log.Sugar.Info("启用音频转码功能") + } + // 开启pprof调试 err := http.ListenAndServe(":19999", nil) if err != nil { diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index ae292f7..ed9bef3 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -219,7 +219,7 @@ func (t *transStream) WriteHeader() error { n += size t.sequenceHeader = t.sequenceHeader[:n] - t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo) + t.MWBuffer = stream.NewMergeWritingBuffer(t.HasVideo()) return nil } diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index 3210ab3..a10169b 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -46,7 +46,7 @@ type mergeWritingBuffer struct { hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧 hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧 - existVideo bool // 是否存在视频 + hasVideo bool // 是否存在视频 } func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) { @@ -108,7 +108,7 @@ func (m *mergeWritingBuffer) FlushSegment() (*collections.ReferenceCounter[[]byt counter := collections.NewReferenceCounter(data) // 遇到完整关键帧切片, 替代前一组 // 或者只保留最近的音频切片 - if m.hasKeyVideoDataInCurrentSegment || !m.existVideo { + if m.hasKeyVideoDataInCurrentSegment || !m.hasVideo { for m.lastKeyVideoDataSegments.Size() > 0 { segment := m.lastKeyVideoDataSegments.Pop() segment.Release() @@ -185,9 +185,9 @@ func (m *mergeWritingBuffer) Close() *collections.Queue[*mbBuffer] { func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { buffer := &mergeWritingBuffer{ - startTS: -1, - existVideo: existVideo, - buffers: collections.NewQueue[*mbBuffer](24), + startTS: -1, + hasVideo: existVideo, + buffers: collections.NewQueue[*mbBuffer](24), } if AppConfig.GOPCache { diff --git a/stream/stream_endinfo.go b/stream/stream_endinfo.go index fa1331e..1a73134 100644 --- a/stream/stream_endinfo.go +++ b/stream/stream_endinfo.go @@ -26,9 +26,9 @@ type StreamEndInfo struct { } func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool { - if len(info.Timestamps) != len(tracks) { - return false - } + //if len(info.Timestamps) != len(tracks) { + // return false + //} for _, track := range tracks { if _, ok := info.Timestamps[track.Stream.CodecID]; !ok { diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 584cc43..10dbca0 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -72,29 +72,27 @@ type transStreamPublisher struct { streamEvents *NonBlockingChannel[*StreamEvent] mainContextEvents chan func() - sinkCount int + sinkCount int // 拉流计数 gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop - recordSink Sink // 每个Source的录制流 - recordFilePath string // 录制流文件路径 - hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟. - _ []transcode.Transcoder // 音频解码器 - _ []transcode.Transcoder // 视频解码器 - originTracks TrackManager // 推流的音视频Streams - allStreamTracks TrackManager // 推流Streams+转码器获得的Stream + recordSink Sink // 每个Source的录制流 + recordFilePath string // 录制流文件路径 + hlsStream TransStream // HLS传输流 + originTracks TrackManager // 推流的原始track + transcodeTracks map[utils.AVCodecID]*TranscodeTrack // 转码Track transStreams map[TransStreamID]TransStream // 所有输出流 forwardTransStream TransStream // 转发流 - sinks map[SinkID]Sink // 保存所有Sink + sinks map[SinkID]Sink // 所有拉流Sink transStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink - existVideo bool // 是否存在视频 - completed atomic.Bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true + hasVideo bool // 是否存在视频 + completed atomic.Bool // 推流track是否解析完毕 closed atomic.Bool streamEndInfo *StreamEndInfo // 之前推流源信息 accumulateTimestamps bool // 是否累加时间戳 timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳 - lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 + lastStreamEndTime time.Time // 最近结束拉流的时间 bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区 } @@ -109,6 +107,7 @@ func (t *transStreamPublisher) run() { t.transStreams = make(map[TransStreamID]TransStream, 10) t.sinks = make(map[SinkID]Sink, 128) t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) + t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4) defer func() { // 清空管道 @@ -137,7 +136,7 @@ func (t *transStreamPublisher) run() { if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascaded { packets := event.Data.([][]byte) for _, data := range packets { - t.DispatchPacket(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]}) + t.DispatchPacketToStream(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]}) UDPReceiveBufferPool.Put(data[:cap(data)]) } } @@ -188,8 +187,7 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() { streams := t.originTracks.All() utils.Assert(len(streams) > 0) - id := GenerateTransStreamID(TransStreamHls, streams...) - hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams, nil) + hlsStream, err := t.CreateTransStream(TransStreamHls, streams, nil) if err != nil { log.Sugar.Errorf("创建HLS输出流失败 source: %s err: %s", t.source, err.Error()) return @@ -197,20 +195,24 @@ func (t *transStreamPublisher) CreateDefaultOutStreams() { t.DispatchGOPBuffer(hlsStream) t.hlsStream = hlsStream - t.transStreams[id] = t.hlsStream } } -func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) { - log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source) - - source := SourceManager.Find(t.source) - utils.Assert(source != nil) - transStream, err := CreateTransStream(source, protocol, tracks, sink) - if err != nil { - return nil, err +// 转码GOPBuffer中的音频 +func (t *transStreamPublisher) transcodeGOPBuffer(track *TranscodeTrack) { + if t.gopBuffer != nil { + t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + if utils.AVMediaTypeAudio != packet.Get().MediaType { + return + } + track.Input(packet.Get()) + }) } +} +func (t *transStreamPublisher) CreateTransStream(protocol TransStreamProtocol, tracks []*Track, sink Sink) (TransStream, error) { + // 匹配和创建适合TransStream流协议的track + var finalTracks []*Track for _, track := range tracks { supportedCodecs, ok := SupportedCodes[protocol] if !ok { @@ -220,28 +222,94 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran _, ok = supportedCodecs[track.Stream.CodecID] if !ok { log.Sugar.Warnf("不支持的编码器 source: %s stream: %s codec: %s", t.source, protocol.String(), track.Stream.CodecID) - continue + // 尝试音频转码 + if utils.AVMediaTypeAudio != track.Stream.MediaType || transcode.CreateAudioTranscoder == nil { + continue + } + + transcodeTrack := t.transcodeTracks[track.Stream.CodecID] + if transcodeTrack == nil { + // 创建音频转码器 + var codecs []utils.AVCodecID + for codec := range SupportedCodes[protocol] { + codecs = append(codecs, codec) + } + + transcoder, stream, err := transcode.CreateAudioTranscoder(track.Stream, codecs) + if err != nil { + log.Sugar.Errorf("创建音频转码器失败 source: %s stream: %s codec: %s err: %s", t.source, protocol.String(), track.Stream.CodecID, err.Error()) + continue + } + + log.Sugar.Infof("创建音频转码器成功 source: %s stream: %s src: %s dst: %s", t.source, protocol.String(), track.Stream.CodecID, transcoder.GetEncoderID()) + + stream.Index = len(t.originTracks.tracks) + len(t.transcodeTracks) + newTrack := &Track{Stream: stream} + + // 如果之前有转码过, 则使用之前的时间戳 + if t.streamEndInfo != nil { + oldTimestamps, ok := t.streamEndInfo.Timestamps[transcoder.GetEncoderID()] + if ok { + newTrack.Dts = oldTimestamps[0] + newTrack.Pts = oldTimestamps[1] + } + } + + transcodeTrack = NewTranscodeTrack(newTrack, transcoder) + t.transcodeTracks[track.Stream.CodecID] = transcodeTrack + + // 转码GOPBuffer中的音频 + t.transcodeGOPBuffer(transcodeTrack) + } + + track = transcodeTrack.track } - var index int // 重新拷贝一个track,传输流内部使用track的时间戳, newTrack := *track - if index, err = transStream.AddTrack(&newTrack); err != nil { + finalTracks = append(finalTracks, &newTrack) + } + + if len(finalTracks) < 1 { + return nil, fmt.Errorf("not found track") + } + + id := GenerateTransStreamID(protocol, finalTracks...) + // 如果已经存在该id的输出流, 则直接返回 + oldTransStream := t.transStreams[id] + if oldTransStream != nil { + return oldTransStream, nil + } + + log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source) + + source := SourceManager.Find(t.source) + utils.Assert(source != nil) + transStream, err := CreateTransStream(source, protocol, tracks, sink) + if err != nil { + return nil, err + } + + for _, track := range finalTracks { + index, err := transStream.AddTrack(track) + if err != nil { log.Sugar.Errorf("添加track失败 err: %s source: %s stream: %s, codec: %s ", err.Error(), t.source, protocol, track.Stream.CodecID) continue } // stream index->muxer track index - transStream.SetMuxerTrack(index, &newTrack) + transStream.SetMuxerTrack(index, track) } if transStream.TrackSize() == 0 { return nil, fmt.Errorf("not found track") } + id = GenerateTransStreamID(protocol, transStream.GetTracks()...) transStream.SetID(id) transStream.SetProtocol(protocol) + t.transStreams[id] = transStream // 创建输出流对应的拉流队列 t.transStreamSinks[id] = make(map[SinkID]Sink, 128) _ = transStream.WriteHeader() @@ -257,16 +325,36 @@ func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol Tran func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) { if t.gopBuffer != nil { t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { - t.DispatchPacket(transStream, packet.Get()) + t.DispatchPacketToStream(transStream, packet.Get()) }) + + // 发送转码包 + for _, track := range t.transcodeTracks { + size := track.packets.Size() + for i := 0; i < size; i++ { + t.DispatchPacketToStream(transStream, track.packets.Peek(i)) + } + } } } -// DispatchPacket 分发AVPacket -func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) { +func (t *transStreamPublisher) DispatchPacket(packet *avformat.AVPacket) { + for _, transStream := range t.transStreams { + if TransStreamGBCascaded == transStream.GetProtocol() { + continue + } + + t.DispatchPacketToStream(transStream, packet) + } +} + +// DispatchPacketToStream 分发AVPacket +func (t *transStreamPublisher) DispatchPacketToStream(transStream TransStream, packet *avformat.AVPacket) { trackIndex, ok := transStream.FindMuxerTrackIndex(packet.Index) if !ok { return + } else if !transStream.GetID().HasTrack(packet.Index) { + return } data, timestamp, videoKey, err := transStream.Input(packet, trackIndex) @@ -280,7 +368,7 @@ func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *a // DispatchBuffer 分发传输流 func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { sinks := t.transStreamSinks[transStream.GetID()] - exist := transStream.IsExistVideo() + exist := transStream.HasVideo() for _, sink := range sinks { @@ -371,20 +459,17 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { tracks = append(tracks, track) } - transStreamId := GenerateTransStreamID(sink.GetProtocol(), tracks...) - transStream, exist := t.transStreams[transStreamId] + transStream, exist := t.transStreams[GenerateTransStreamID(sink.GetProtocol(), tracks...)] if !exist { var err error - transStream, err = t.CreateTransStream(transStreamId, sink.GetProtocol(), tracks, sink) + transStream, err = t.CreateTransStream(sink.GetProtocol(), tracks, sink) if err != nil { log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), t.source) return false } - - t.transStreams[transStreamId] = transStream } - sink.SetTransStreamID(transStreamId) + sink.SetTransStreamID(transStream.GetID()) { sink.Lock() @@ -398,6 +483,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { sink.UnLock() } + // 开始推流 err := sink.StartStreaming(transStream) if err != nil { log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkID2String(sink.GetID()), t.source) @@ -416,7 +502,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { } t.sinks[sink.GetID()] = sink - t.transStreamSinks[transStreamId][sink.GetID()] = sink + t.transStreamSinks[transStream.GetID()][sink.GetID()] = sink // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. _, ok := sink.GetConn().(*transport.Conn) @@ -436,7 +522,7 @@ func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { } // 新建传输流,发送已经缓存的音视频帧 - if !exist && AppConfig.GOPCache && t.existVideo && TransStreamGBCascaded != transStream.GetProtocol() { + if !exist && AppConfig.GOPCache && t.hasVideo && TransStreamGBCascaded != transStream.GetProtocol() { t.DispatchGOPBuffer(transStream) } @@ -516,6 +602,11 @@ func (t *transStreamPublisher) doClose() { t.gopBuffer = nil } + // 关闭转码器 + for _, track := range t.transcodeTracks { + track.Close() + } + // 关闭录制流 if t.recordSink != nil { t.recordSink.Close() @@ -523,7 +614,16 @@ func (t *transStreamPublisher) doClose() { // 保留推流信息 if t.sinkCount > 0 && len(t.originTracks.All()) > 0 { - sourceHistory := StreamEndInfoBride(t.source, t.originTracks.All(), t.transStreams) + var tracks []*Track + for _, track := range t.originTracks.All() { + tracks = append(tracks, track) + } + + for _, track := range t.transcodeTracks { + tracks = append(tracks, track.track) + } + + sourceHistory := StreamEndInfoBride(t.source, tracks, t.transStreams) streamEndInfoManager.Add(sourceHistory) } @@ -582,8 +682,7 @@ func (t *transStreamPublisher) WriteHeader() { t.streamEndInfo = streamInfo // 恢复每路track的时间戳 - tracks := t.originTracks.All() - for _, track := range tracks { + for _, track := range t.originTracks.All() { timestamps := streamInfo.Timestamps[track.Stream.CodecID] track.Dts = timestamps[0] track.Pts = timestamps[1] @@ -613,7 +712,7 @@ func (t *transStreamPublisher) WriteHeader() { } // 如果不存在视频帧, 清空GOP缓存 - if !t.existVideo { + if !t.hasVideo { t.ClearGopBuffer(false) t.gopBuffer = nil } @@ -643,11 +742,16 @@ func (t *transStreamPublisher) ClearGopBuffer(free bool) { t.bitstreamFilterBuffer.Pop() } }) + + // 丢弃转码track中的缓存 + for _, track := range t.transcodeTracks { + track.Clear() + } } func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) { // 保存到GOP缓存 - if (AppConfig.GOPCache && t.existVideo) || !t.completed.Load() { + if (AppConfig.GOPCache && t.hasVideo) || !t.completed.Load() { packet.Get().OnBufferAlloc = func(size int) []byte { if t.bitstreamFilterBuffer == nil { t.bitstreamFilterBuffer = collections.NewRBBlockBuffer(1024 * 1024 * 2) @@ -669,14 +773,18 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av t.CorrectTimestamp(packet.Get()) // 分发给各个传输流 - for _, transStream := range t.transStreams { - if TransStreamGBCascaded != transStream.GetProtocol() { - t.DispatchPacket(transStream, packet.Get()) + t.DispatchPacket(packet.Get()) + + // 转码 + for _, track := range t.transcodeTracks { + transcodePackets := track.Input(packet.Get()) + for _, transcodePkt := range transcodePackets { + t.DispatchPacket(transcodePkt) } } // 未开启GOP缓存或只存在音频流, 立即释放 - if !AppConfig.GOPCache || !t.existVideo { + if !AppConfig.GOPCache || !t.hasVideo { packet.Release() } } @@ -687,7 +795,7 @@ func (t *transStreamPublisher) OnNewTrack(track *Track) { t.originTracks.Add(track) if utils.AVMediaTypeVideo == stream.MediaType { - t.existVideo = true + t.hasVideo = true } // 创建GOPBuffer @@ -704,10 +812,13 @@ func (t *transStreamPublisher) CorrectTimestamp(packet *avformat.AVPacket) { timestamps := t.streamEndInfo.Timestamps[packet.CodecID] t.accumulateTimestamps = true - log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1]) + log.Sugar.Infof("使用上次推流的时间戳 dts: %d, pts: %d", timestamps[0], timestamps[1]) } track := t.originTracks.Find(packet.CodecID) + if track == nil { + return + } duration := packet.GetDuration(packet.Timebase) // 根据duration来累加时间戳 @@ -731,7 +842,7 @@ func (t *transStreamPublisher) GetStreamEndInfo() *StreamEndInfo { } func (t *transStreamPublisher) TranscodeTracks() []*Track { - return t.allStreamTracks.All() + return t.originTracks.All() } func (t *transStreamPublisher) LastStreamEndTime() time.Time { diff --git a/stream/trans_stream.go b/stream/trans_stream.go index b402c68..348b326 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -48,7 +48,7 @@ type TransStream interface { // Close 关闭传输流, 返回还未flush的合并写块 Close() ([]*collections.ReferenceCounter[[]byte], int64, error) - IsExistVideo() bool + HasVideo() bool IsTCPStreaming() bool @@ -60,7 +60,7 @@ type BaseTransStream struct { Tracks []*Track MuxerIndex map[int]int // stream index->muxer track index Completed bool - ExistVideo bool + hasVideo bool Protocol TransStreamProtocol OutBuffer []*collections.ReferenceCounter[[]byte] // 传输流的合并写块队列 @@ -90,7 +90,7 @@ func (t *BaseTransStream) AddTrack(track *Track) (int, error) { func (t *BaseTransStream) SetMuxerTrack(muxerIndex int, track *Track) { t.Tracks = append(t.Tracks, track) if utils.AVMediaTypeVideo == track.Stream.MediaType { - t.ExistVideo = true + t.hasVideo = true } if t.MuxerIndex == nil { @@ -159,8 +159,8 @@ func (t *BaseTransStream) GetTracks() []*Track { return t.Tracks } -func (t *BaseTransStream) IsExistVideo() bool { - return t.ExistVideo +func (t *BaseTransStream) HasVideo() bool { + return t.hasVideo } func (t *BaseTransStream) ReadExtraData(timestamp int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { diff --git a/stream/trans_utils.go b/stream/trans_utils.go index 2aa79ff..df6e7e7 100644 --- a/stream/trans_utils.go +++ b/stream/trans_utils.go @@ -2,32 +2,19 @@ package stream import "github.com/lkmio/avformat/utils" -// TransStreamID 每个传输流的唯一Id,根据输出流协议ID+流包含的音视频编码器ID生成 -// 输出流协议ID占用高8位 +// TransStreamID 每个传输流的唯一Id,根据输出流协议ID+track index生成 +// 输出流协议占低8位 // 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流. type TransStreamID uint64 -var ( - // AVCodecID转为byte的对应关系 - narrowCodecIds map[int]byte -) - -func init() { - narrowCodecIds = map[int]byte{ - int(utils.AVCodecIdH263): 0x1, - int(utils.AVCodecIdH264): 0x2, - int(utils.AVCodecIdH265): 0x3, - int(utils.AVCodecIdAV1): 0x4, - int(utils.AVCodecIdVP8): 0x5, - int(utils.AVCodecIdVP9): 0x6, - - int(utils.AVCodecIdAAC): 101, - int(utils.AVCodecIdMP3): 102, - int(utils.AVCodecIdOPUS): 103, - int(utils.AVCodecIdPCMALAW): 104, - int(utils.AVCodecIdPCMMULAW): 105, - int(utils.AVCodecIdADPCMG722): 106, +func (id TransStreamID) HasTrack(index int) bool { + for i := 1; i < 8; i++ { + if int(id>>(i*8))&0xFF == index { + return true + } } + + return false } // GenerateTransStreamID 根据传入的推拉流协议和编码器ID生成StreamId @@ -54,16 +41,9 @@ func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) Trans len_ := len(tracks) utils.Assert(len_ > 0 && len_ < 8) - var streamId uint64 - streamId = uint64(protocol) << 56 - + var streamId = uint64(protocol) & 0xFF for i, track := range tracks { - id, ok := narrowCodecIds[int(track.Stream.CodecID)] - if ok { - id = byte(track.Stream.CodecID) - } - - streamId |= uint64(id) << (48 - i*8) + streamId |= uint64(track.Stream.Index) << ((i + 1) * 8) } return TransStreamID(streamId) diff --git a/stream/transcode_track.go b/stream/transcode_track.go new file mode 100644 index 0000000..f0214fa --- /dev/null +++ b/stream/transcode_track.go @@ -0,0 +1,74 @@ +package stream + +import ( + "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" + "github.com/lkmio/lkm/transcode" + "sync" +) + +var ( + AudioPacketDataPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 48000) + }, + } +) + +type TranscodeTrack struct { + track *Track + packets *collections.Queue[*avformat.AVPacket] + transcoder transcode.Transcoder + result []*avformat.AVPacket + + dst int64 + pts int64 +} + +func (t *TranscodeTrack) Input(packet *avformat.AVPacket) []*avformat.AVPacket { + t.result = t.result[:0] + + _, _ = t.transcoder.Transcode(packet, func(bytes []byte, duration int) { + dstPkt := avformat.PacketPool.Get().(*avformat.AVPacket) + *dstPkt = *packet + + data := AudioPacketDataPool.Get().([]byte) + copy(data, bytes) + + dstPkt.Data = data[:len(bytes)] + dstPkt.Timebase = 1000 + dstPkt.Pts = t.pts + dstPkt.Dts = t.pts + dstPkt.Duration = int64(duration) + dstPkt.CodecID = t.track.Stream.CodecID + dstPkt.Index = t.track.Stream.Index + + t.pts += dstPkt.Duration + t.dst += dstPkt.Duration + t.packets.Push(dstPkt) + t.result = append(t.result, dstPkt) + }) + + return t.result +} + +func (t *TranscodeTrack) Clear() { + for t.packets.Size() > 0 { + packet := t.packets.Pop() + AudioPacketDataPool.Put(packet.Data[:cap(packet.Data)]) + avformat.FreePacket(packet) + } +} + +func (t *TranscodeTrack) Close() { + t.transcoder.Close() + t.Clear() +} + +func NewTranscodeTrack(track *Track, transcoder transcode.Transcoder) *TranscodeTrack { + return &TranscodeTrack{ + track: track, + transcoder: transcoder, + packets: collections.NewQueue[*avformat.AVPacket](128), + } +} diff --git a/transcode/audio_transcoder.go b/transcode/audio_transcoder.go new file mode 100644 index 0000000..822e48d --- /dev/null +++ b/transcode/audio_transcoder.go @@ -0,0 +1,126 @@ +//go:build audio_transcode +// +build audio_transcode + +package transcode + +import ( + "fmt" + audio_transcoder "github.com/lkmio/audio-transcoder" + "github.com/lkmio/avformat" + "github.com/lkmio/avformat/utils" +) + +func init() { + CreateAudioTranscoder = NewAudioTranscoder +} + +type AudioTranscoder struct { + decoder audio_transcoder.Decoder + encoder audio_transcoder.Encoder + encoderID utils.AVCodecID + // reSampler audio_transcoder.Resampler + pcmData []byte + pktData []byte +} + +func (t *AudioTranscoder) Transcode(src *avformat.AVPacket, cb func([]byte, int)) (int, error) { + if src.MediaType != utils.AVMediaTypeAudio { + return 0, fmt.Errorf("unsupported media type: %s", src.MediaType.String()) + } + + pcmN, err := t.decoder.Decode(src.Data, t.pcmData) + if err != nil { + return 0, err + } else if pcmN < 1 { + return 0, nil + } + + pktN, err := t.encoder.Encode(t.pcmData[:pcmN], func(bytes []byte) { + cb(bytes, t.encoder.PacketDurationMS()) + }) + + return pktN, nil +} + +func (t *AudioTranscoder) Close() { + t.decoder.Destroy() + t.encoder.Destroy() +} + +func (t *AudioTranscoder) GetEncoderID() utils.AVCodecID { + return t.encoderID +} + +func NewAudioTranscoder(src *avformat.AVStream, dst []utils.AVCodecID) (Transcoder, *avformat.AVStream, error) { + decoder := audio_transcoder.FindDecoder(src.CodecID.String()) + if decoder == nil { + return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String()) + } + + var err error + var encoder audio_transcoder.Encoder + var dstCodec utils.AVCodecID + for _, codec := range dst { + encoder, err = audio_transcoder.FindEncoder(codec.String(), src.SampleRate, src.Channels) + if encoder != nil { + dstCodec = codec + break + } + } + + if err != nil { + return nil, nil, err + } else if encoder == nil { + return nil, nil, fmt.Errorf("unsupported audio codec: %s", src.CodecID.String()) + } + + switch src.CodecID { + case utils.AVCodecIdAAC: + if err = decoder.(*audio_transcoder.AACDecoder).Create(nil, src.Data); err != nil { + return nil, nil, err + } + break + case utils.AVCodecIdOPUS: + if err = decoder.(*audio_transcoder.OpusDecoder).Create(src.SampleRate, src.Channels); err != nil { + return nil, nil, err + } + break + } + + adtsHeader := 1 + switch dstCodec { + case utils.AVCodecIdAAC: + if _, err = encoder.(*audio_transcoder.AACEncoder).Create(src.SampleRate, src.Channels, adtsHeader); err != nil { + decoder.Destroy() + return nil, nil, err + } + break + case utils.AVCodecIdOPUS: + if _, err = encoder.(*audio_transcoder.OpusEncoder).Create(src.SampleRate, src.Channels); err != nil { + decoder.Destroy() + return nil, nil, err + } + } + + dstStream := &avformat.AVStream{} + *dstStream = *src + dstStream.CodecID = dstCodec + dstStream.Timebase = 1000 + + if data := encoder.ExtraData(); data != nil { + dstStream.Data = make([]byte, len(data)) + copy(dstStream.Data, data) + } + + if utils.AVCodecIdAAC == dstCodec { + dstStream.HasADTSHeader = adtsHeader == 1 + } + + return &AudioTranscoder{ + decoder: decoder, + encoder: encoder, + pcmData: make([]byte, src.SampleRate*src.Channels*2), + pktData: make([]byte, src.SampleRate*src.Channels*2), + encoderID: dstCodec, + }, dstStream, nil +} diff --git a/transcode/transcoder.go b/transcode/transcoder.go index 7137070..6d01a78 100644 --- a/transcode/transcoder.go +++ b/transcode/transcoder.go @@ -1,4 +1,18 @@ package transcode +import ( + "github.com/lkmio/avformat" + "github.com/lkmio/avformat/utils" +) + +var ( + CreateAudioTranscoder func(src *avformat.AVStream, dst []utils.AVCodecID) (Transcoder, *avformat.AVStream, error) +) + type Transcoder interface { + Transcode(src *avformat.AVPacket, cb func([]byte, int)) (int, error) + + GetEncoderID() utils.AVCodecID + + Close() } diff --git a/web/rtc.html b/web/rtc.html index e05325e..2302681 100644 --- a/web/rtc.html +++ b/web/rtc.html @@ -50,7 +50,7 @@ let remote_view = document.getElementById("videoview"); let source = document.getElementById("source").value; let pc = new RTCPeerConnection(null); - // pc.addTransceiver("audio", {direction: "recvonly"}); + pc.addTransceiver("audio", {direction: "recvonly"}); pc.addTransceiver("video", {direction: "recvonly"}); let offer = await pc.createOffer();