package engine import ( "bufio" "context" "io" "net" "strconv" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/track" "m7s.live/engine/v4/util" ) const ( SUBTYPE_RAW = iota SUBTYPE_RTP SUBTYPE_FLV ) const ( SUBSTATE_INIT = iota SUBSTATE_FIRST SUBSTATE_NORMAL ) // AVCC 格式的序列帧 type VideoDeConf []byte // AVCC 格式的序列帧 type AudioDeConf []byte type AudioFrame struct { *AVFrame *track.Audio AbsTime uint32 PTS uint32 DTS uint32 } type VideoFrame struct { *AVFrame *track.Video AbsTime uint32 PTS uint32 DTS uint32 } type FLVFrame net.Buffers type AudioRTP RTPFrame type VideoRTP RTPFrame type HasAnnexB interface { GetAnnexB() (r net.Buffers) } func (a AudioDeConf) WithOutRTMP() []byte { return a[2:] } func (v VideoDeConf) WithOutRTMP() []byte { return v[5:] } func (f FLVFrame) IsAudio() bool { return f[0][0] == codec.FLV_TAG_TYPE_AUDIO } func (f FLVFrame) IsVideo() bool { return f[0][0] == codec.FLV_TAG_TYPE_VIDEO } func (f FLVFrame) WriteTo(w io.Writer) (int64, error) { t := (net.Buffers)(f) return t.WriteTo(w) } func (a AudioFrame) GetADTS() (r net.Buffers) { r = append(append(r, a.ADTS.Value), a.AUList.ToBuffers()...) return } func (a AudioFrame) WriteRawTo(w io.Writer) (n int64, err error) { aulist := a.AUList.ToBuffers() return aulist.WriteTo(w) } func (v VideoFrame) GetAnnexB() (r net.Buffers) { if v.IFrame { r = v.ParamaterSets.GetAnnexB() } v.AUList.Range(func(au *util.BLL) bool { r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...) return true }) return } func (v VideoFrame) WriteAnnexBTo(w io.Writer) (n int64, err error) { annexB := v.GetAnnexB() return annexB.WriteTo(w) } type ISubscriber interface { IIO GetSubscriber() *Subscriber IsPlaying() bool PlayRaw() PlayBlock(byte) PlayFLV() Stop(reason ...zapcore.Field) Subscribe(streamPath string, sub ISubscriber) error } type TrackPlayer struct { context.Context context.CancelFunc AudioReader, VideoReader *track.AVRingReader Audio *track.Audio Video *track.Video } func (player *TrackPlayer) StopPlay() { player.CancelFunc() if player.AudioReader != nil { player.AudioReader.StopRead() } if player.VideoReader != nil { player.VideoReader.StopRead() } } // Subscriber 订阅者实体定义 type Subscriber struct { IO Config *config.Subscribe readers []*track.AVRingReader TrackPlayer `json:"-" yaml:"-"` } func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error { return s.receive(streamPath, sub) } func (s *Subscriber) GetSubscriber() *Subscriber { return s } func (s *Subscriber) SetIO(i any) { s.IO.SetIO(i) if s.Writer != nil && s.Config != nil && s.Config.WriteBufferSize > 0 { s.Writer = bufio.NewWriterSize(s.Writer, s.Config.WriteBufferSize) } } func (s *Subscriber) OnEvent(event any) { switch v := event.(type) { case Track: //默认接受所有track s.AddTrack(v) default: s.IO.OnEvent(event) } } func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) { result = track.NewAVRingReader(t) s.readers = append(s.readers, result) result.Logger = s.With(zap.String("track", t.Name)) return } func (s *Subscriber) AddTrack(t Track) bool { switch v := t.(type) { case *track.Video: if !s.Config.SubVideo { return false } var startTs time.Duration if s.VideoReader != nil { startTs = time.Duration(s.VideoReader.AbsTime) * time.Millisecond s.VideoReader.StopRead() } s.VideoReader = s.CreateTrackReader(&v.Media) s.VideoReader.StartTs = startTs s.Video = v case *track.Audio: if !s.Config.SubAudio { return false } var startTs time.Duration if s.AudioReader != nil { startTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond s.AudioReader.StopRead() } s.AudioReader = s.CreateTrackReader(&v.Media) s.AudioReader.StartTs = startTs s.Audio = v default: return false } s.Info("track+1", zap.String("name", t.GetName())) return true } func (s *Subscriber) IsPlaying() bool { return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil } func (s *Subscriber) SubPulse() { s.Stream.Receive(SubPulse{s.Spesific.(ISubscriber)}) } func (s *Subscriber) PlayRaw() { s.PlayBlock(SUBTYPE_RAW) } func (s *Subscriber) PlayFLV() { s.PlayBlock(SUBTYPE_FLV) } func (s *Subscriber) PlayRTP() { s.PlayBlock(SUBTYPE_RTP) } // PlayBlock 阻塞式读取数据 func (s *Subscriber) PlayBlock(subType byte) { spesic := s.Spesific if spesic == nil { s.Error("play before subscribe") return } if s.IO.Err() != nil { s.Error("play", zap.Error(s.IO.Err())) return } s.Info("playblock", zap.Uint8("subType", subType)) s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO) ctx := s.TrackPlayer.Context conf := s.Config hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio stopReason := zap.String("reason", "stop") defer s.onStop(&stopReason) if !hasAudio && !hasVideo { stopReason = zap.String("reason", "play neither video nor audio") return } sendVideoDecConf := func() { // s.Debug("sendVideoDecConf") spesic.OnEvent(s.Video.ParamaterSets) spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead)) } sendAudioDecConf := func() { // s.Debug("sendAudioDecConf") spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead)) } var sendAudioFrame, sendVideoFrame func(*AVFrame) switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { if frame.AUList.ByteLength == 0 { return } spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()}) } sendAudioFrame = func(frame *AVFrame) { if frame.AUList.ByteLength == 0 { return } // fmt.Println("a", s.AudioReader.Delay) // fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime) spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()}) } case SUBTYPE_RTP: var videoSeq, audioSeq uint16 sendVideoFrame = func(frame *AVFrame) { // fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame) delta := uint32(s.VideoReader.SkipTs * 90 / time.Millisecond) frame.RTP.Range(func(vp RTPFrame) bool { videoSeq++ copy := *vp.Packet vp.Packet = © vp.Header.Timestamp = vp.Header.Timestamp - delta vp.Header.SequenceNumber = videoSeq spesic.OnEvent((VideoRTP)(vp)) return true }) } sendAudioFrame = func(frame *AVFrame) { // fmt.Println("a", frame.Sequence, frame.Timestamp, s.AudioReader.AbsTime) delta := uint32(s.AudioReader.SkipTs / time.Millisecond * time.Duration(s.AudioReader.Track.SampleRate) / 1000) frame.RTP.Range(func(ap RTPFrame) bool { audioSeq++ copy := *ap.Packet ap.Packet = © ap.Header.SequenceNumber = audioSeq ap.Header.Timestamp = ap.Header.Timestamp - delta spesic.OnEvent((AudioRTP)(ap)) return true }) } case SUBTYPE_FLV: flvHeadCache := make([]byte, 15) //内存复用 sendFlvFrame := func(t byte, ts uint32, avcc ...[]byte) { // println(t, ts) // fmt.Printf("%d %X %X %d\n", t, avcc[0][0], avcc[0][1], ts) flvHeadCache[0] = t result := append(FLVFrame{flvHeadCache[:11]}, avcc...) dataSize := uint32(util.SizeOfBuffers(avcc)) if dataSize == 0 { return } util.PutBE(flvHeadCache[1:4], dataSize) util.PutBE(flvHeadCache[4:7], ts) flvHeadCache[7] = byte(ts >> 24) spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11))) } sendVideoDecConf = func() { sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, s.VideoReader.Track.SequenceHead) } sendAudioDecConf = func() { sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead) } sendVideoFrame = func(frame *AVFrame) { // fmt.Println(frame.Sequence, s.VideoReader.AbsTime, s.VideoReader.Delay, frame.IFrame) // b := util.Buffer(frame.AVCC.ToBytes()[5:]) // for b.CanRead() { // nalulen := int(b.ReadUint32()) // if b.CanReadN(nalulen) { // bb := b.ReadN(int(nalulen)) // println(nalulen, codec.ParseH264NALUType(bb[0])) // } else { // println("error") // } // } sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...) } sendAudioFrame = func(frame *AVFrame) { // fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay) sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...) } } var subMode = conf.SubMode //订阅模式 if s.Args.Has(conf.SubModeArgName) { subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName)) } var initState = 0 var videoFrame, audioFrame *AVFrame for ctx.Err() == nil { if hasVideo { for ctx.Err() == nil { err := s.VideoReader.ReadFrame(subMode) if err == nil { err = ctx.Err() } if err != nil { stopReason = zap.Error(err) return } videoFrame = s.VideoReader.Value // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) if videoFrame.IFrame && s.VideoReader.DecConfChanged() { s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq sendVideoDecConf() } if hasAudio { if audioFrame != nil { if util.Conditoinal(conf.SyncMode == 0, videoFrame.Timestamp > audioFrame.Timestamp, videoFrame.WriteTime.After(audioFrame.WriteTime)) { // fmt.Println("switch audio", audioFrame.CanRead) sendAudioFrame(audioFrame) audioFrame = nil break } } else if initState++; initState >= 2 { break } } if !conf.IFrameOnly || videoFrame.IFrame { sendVideoFrame(videoFrame) } else { // fmt.Println("skip video", frame.Sequence) } } } // 正常模式下或者纯音频模式下,音频开始播放 if hasAudio { for ctx.Err() == nil { switch s.AudioReader.State { case track.READSTATE_INIT: if s.Video != nil { s.AudioReader.FirstTs = s.VideoReader.FirstTs } case track.READSTATE_NORMAL: if s.Video != nil { s.AudioReader.SkipTs = s.VideoReader.SkipTs } } err := s.AudioReader.ReadFrame(subMode) if err == nil { err = ctx.Err() } if err != nil { stopReason = zap.Error(err) return } audioFrame = s.AudioReader.Value // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) if s.AudioReader.DecConfChanged() { s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq sendAudioDecConf() } if hasVideo && videoFrame != nil { if util.Conditoinal(conf.SyncMode == 0, audioFrame.Timestamp > videoFrame.Timestamp, audioFrame.WriteTime.After(videoFrame.WriteTime)) { sendVideoFrame(videoFrame) videoFrame = nil break } } if audioFrame.Timestamp >= s.AudioReader.SkipTs { sendAudioFrame(audioFrame) } else { // fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs) } } } } stopReason = zap.Error(ctx.Err()) } func (s *Subscriber) onStop(reason *zapcore.Field) { s.StopPlay() if !s.Stream.IsClosed() { s.Stop(*reason) if s.Config.Internal { s.Stream.Receive(s.Spesific) } } }