From dbf0e0b070402dea2421515bbf2adb0e92a98f2c Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Fri, 20 May 2022 14:17:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E6=8E=89unknowTrack?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/index.go | 35 +++++++++++++++++++++++-- config/config.go | 2 +- config/record.go | 7 +++++ http.go | 12 +++++++++ io.go | 9 ++++--- publisher.go | 66 +++++++++++++++++++++++++++++++++++++++++++++--- stream.go | 43 ++++++++++++++++++++----------- subscriber.go | 4 +-- summary.go | 14 ++++++---- track/audio.go | 57 ----------------------------------------- track/base.go | 31 ----------------------- track/video.go | 42 ------------------------------ 12 files changed, 160 insertions(+), 162 deletions(-) diff --git a/common/index.go b/common/index.go index 6327bca..a09bf9d 100644 --- a/common/index.go +++ b/common/index.go @@ -6,8 +6,40 @@ import ( "github.com/pion/rtp" ) +// Base 基础Track类 +type Base struct { + Name string + Stream IStream `json:"-"` + ts time.Time + bytes int + frames int + BPS int + FPS int +} + +func (bt *Base) ComputeBPS(bytes int) { + bt.bytes += bytes + bt.frames++ + if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { + bt.BPS = bt.bytes / int(elapse) + bt.FPS = bt.frames / int(elapse) + bt.bytes = 0 + bt.frames = 0 + bt.ts = time.Now() + } +} + +func (bt *Base) GetBase() *Base { + return bt +} + +func (bt *Base) Flush(bf *BaseFrame) { + bt.ComputeBPS(bf.BytesIn) + bf.Timestamp = time.Now() +} + type Track interface { - GetName() string + GetBase() *Base LastWriteTime() time.Time } @@ -37,4 +69,3 @@ type AudioTrack interface { WriteSlice(AudioSlice) WriteADTS([]byte) } - diff --git a/config/config.go b/config/config.go index 1020645..32058d3 100644 --- a/config/config.go +++ b/config/config.go @@ -88,7 +88,7 @@ func (config Config) Unmarshal(s any) { } else { switch fv.Type().Kind() { case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: - fv.SetUint(value.Uint()) + fv.SetUint(uint64(value.Int())) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: fv.SetInt(value.Int()) case reflect.Float32, reflect.Float64: diff --git a/config/record.go b/config/record.go index 3ee40c3..662a3fc 100644 --- a/config/record.go +++ b/config/record.go @@ -2,6 +2,7 @@ package config import ( "io" + "net/http" "os" "path" "path/filepath" @@ -27,10 +28,15 @@ type Record struct { AutoRecord bool Filter string filterReg *regexp.Regexp + fs http.Handler CreateFileFn func(filename string, append bool) (FileWr, error) `yaml:"-"` GetDurationFn func(file io.ReadSeeker) uint32 `yaml:"-"` } +func (r *Record) ServeHTTP (w http.ResponseWriter, req *http.Request) { + r.fs.ServeHTTP(w, req) +} + func (r *Record) NeedRecord(streamPath string) bool { return r.AutoRecord && (r.filterReg == nil || r.filterReg.MatchString(streamPath)) } @@ -40,6 +46,7 @@ func (r *Record) Init() { if r.Filter != "" { r.filterReg = regexp.MustCompile(r.Filter) } + r.fs = http.FileServer(http.Dir(r.Path)) r.CreateFileFn = func(filename string, append bool) (file FileWr, err error) { filePath := filepath.Join(r.Path, filename) flag := os.O_CREATE diff --git a/http.go b/http.go index 8ada2f0..4181396 100644 --- a/http.go +++ b/http.go @@ -21,6 +21,18 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { util.ReturnJson(summary.collect, time.Second, rw, r) } +func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request) { + if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" { + if s := Streams.Get(streamPath); s != nil { + json.NewEncoder(rw).Encode(s) + } else { + http.Error(rw, "no such stream", http.StatusNotFound) + } + } else { + http.Error(rw, "no streamPath", http.StatusBadRequest) + } +} + func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request) { json.NewEncoder(rw).Encode(&struct { Version string diff --git a/io.go b/io.go index 725448a..0ec24cf 100644 --- a/io.go +++ b/io.go @@ -21,6 +21,7 @@ type ClientConfig interface { config.Pull | config.Push } +// 发布者或者订阅者的共用结构体 type IO[C IOConfig, S IIO] struct { ID string Type string @@ -150,7 +151,7 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error { if create { EventBus <- s // 通知发布者按需拉流 } - EventBus <- specific // 全局广播订阅事件 + EventBus <- specific // 全局广播订阅事件 defer func() { if err == nil { specific.OnEvent(specific) @@ -162,15 +163,15 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error { } return StreamIsClosedErr } - -type Client[C ClientConfig] struct { +// ClientIO 作为Client角色(Puller,Pusher)的公共结构体 +type ClientIO[C ClientConfig] struct { Config *C StreamPath string // 本地流标识 RemoteURL string // 远程服务器地址(用于推拉) ReConnectCount int //重连次数 } -func (c *Client[C]) init(streamPath string, url string, conf *C) { +func (c *ClientIO[C]) init(streamPath string, url string, conf *C) { c.Config = conf c.StreamPath = streamPath c.RemoteURL = url diff --git a/publisher.go b/publisher.go index 6407b9e..fe8a70f 100644 --- a/publisher.go +++ b/publisher.go @@ -4,6 +4,7 @@ import ( "io" "go.uber.org/zap" + "m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec/mpegts" "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" @@ -42,8 +43,7 @@ func (p *Publisher) OnEvent(event any) { switch v := event.(type) { case IPublisher: if p.Equal(v) { //第一任 - p.AudioTrack = p.Stream.NewAudioTrack() - p.VideoTrack = p.Stream.NewVideoTrack() + } else { // 使用前任的track,因为订阅者都挂在前任的上面 p.AudioTrack = v.getAudioTrack() p.VideoTrack = v.getVideoTrack() @@ -53,6 +53,66 @@ func (p *Publisher) OnEvent(event any) { } } +func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) { + if p.VideoTrack == nil { + if frame.IsSequence() { + ts = 0 + codecID := frame.VideoCodecID() + switch codecID { + case codec.CodecID_H264: + p.VideoTrack = track.NewH264(p.Stream) + case codec.CodecID_H265: + p.VideoTrack = track.NewH265(p.Stream) + default: + p.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID))) + return + } + p.VideoTrack.WriteAVCC(ts, frame) + } else { + p.Stream.Warn("need sequence frame") + } + } else { + p.VideoTrack.WriteAVCC(ts, frame) + } +} + +func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) { + if p.AudioTrack == nil { + codecID := frame.AudioCodecID() + switch codecID { + case codec.CodecID_AAC: + if !frame.IsSequence() || len(frame) < 4 { + return + } + a := track.NewAAC(p.Stream) + p.AudioTrack = a + a.SampleSize = 16 + a.AVCCHead = []byte{frame[0], 1} + a.WriteAVCC(0, frame) + case codec.CodecID_PCMA, + codec.CodecID_PCMU: + alaw := true + if codecID == codec.CodecID_PCMU { + alaw = false + } + a := track.NewG711(p.Stream, alaw) + p.AudioTrack = a + a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) + a.SampleSize = 16 + if frame[0]&0x02 == 0 { + a.SampleSize = 8 + } + a.Channels = frame[0]&0x01 + 1 + a.AVCCHead = frame[:1] + p.AudioTrack.WriteAVCC(ts, frame) + default: + p.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID))) + } + } else { + p.AudioTrack.WriteAVCC(ts, frame) + } +} + type IPuller interface { IPublisher Connect() error @@ -63,7 +123,7 @@ type IPuller interface { // 用于远程拉流的发布者 type Puller struct { - Client[config.Pull] + ClientIO[config.Pull] } // 是否需要重连 diff --git a/stream.go b/stream.go index f4e5592..4d77687 100644 --- a/stream.go +++ b/stream.go @@ -137,6 +137,32 @@ type Stream struct { AppName string StreamName string } +type StreamSummay struct { + Path string + State StreamState + Subscribers int + Tracks []string + StartTime int64 + Type string + BPS int +} + +// Summary 返回流的简要信息 +func (s *Stream) Summary() (r StreamSummay) { + if s.Publisher != nil { + r.Type = s.Publisher.GetIO().Type + } + //TODO: Lock + for _, t := range s.Tracks { + r.BPS += t.GetBase().BPS + r.Tracks = append(r.Tracks, t.GetBase().Name) + } + r.Path = s.Path + r.State = s.State + r.Subscribers = len(s.Subscribers) + r.StartTime = s.StartTime.Unix() + return +} func (s *Stream) SSRC() uint32 { return uint32(uintptr(unsafe.Pointer(s))) @@ -329,14 +355,14 @@ func (s *Stream) run() { s.action(ACTION_FIRSTENTER) } case Track: - name := v.GetName() + name := v.GetBase().Name if _, ok := s.Tracks[name]; !ok { s.Tracks[name] = v s.Info("track +1", zap.String("name", name)) s.broadcast(v) } case TrackRemoved: - name := v.GetName() + name := v.GetBase().Name if t, ok := s.Tracks[name]; ok { s.Info("track -1", zap.String("name", name)) delete(s.Tracks, name) @@ -380,19 +406,6 @@ func (s *Stream) RemoveTrack(t Track) { s.Receive(TrackRemoved{t}) } -// 如果暂时不知道编码格式可以用这个 -func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) { - vt = &track.UnknowVideo{} - vt.Stream = r - return -} - -func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) { - at = &track.UnknowAudio{} - at.Stream = r - return -} - func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data) { dt = &track.Data{ Locker: locker, diff --git a/subscriber.go b/subscriber.go index 663586b..4c14523 100644 --- a/subscriber.go +++ b/subscriber.go @@ -138,7 +138,7 @@ func (s *Subscriber) AddTrack(t Track) bool { default: return false } - s.Info("track+1", zap.String("name", t.GetName())) + s.Info("track+1", zap.String("name", t.GetBase().Name)) return true } @@ -335,7 +335,7 @@ type IPusher interface { Reconnect() bool } type Pusher struct { - Client[config.Push] + ClientIO[config.Push] } // 是否需要重连 diff --git a/summary.go b/summary.go index d99efb2..24df674 100644 --- a/summary.go +++ b/summary.go @@ -37,7 +37,7 @@ type Summary struct { Usage float64 } NetWork []NetWorkInfo - Streams []*Stream + Streams []StreamSummay lastNetWork []net.IOCountersStat ref int32 } @@ -62,6 +62,7 @@ func (s *Summary) Start() { func (s *Summary) Point() *Summary { return s } + // Running 是否正在采集数据 func (s *Summary) Running() bool { return s.ref > 0 @@ -87,7 +88,7 @@ func (s *Summary) Report(slave *Summary) { children.Set(slave.Address, slave) } -func (s *Summary) collect() *Summary{ +func (s *Summary) collect() *Summary { v, _ := mem.VirtualMemory() d, _ := disk.Usage("/") nv, _ := net.IOCounters(true) @@ -107,9 +108,9 @@ func (s *Summary) collect() *Summary{ s.NetWork = []NetWorkInfo{} for i, n := range nv { info := NetWorkInfo{ - Name: n.Name, + Name: n.Name, Receive: n.BytesRecv, - Sent: n.BytesSent, + Sent: n.BytesSent, } if s.lastNetWork != nil && len(s.lastNetWork) > i { info.ReceiveSpeed = n.BytesRecv - s.lastNetWork[i].BytesRecv @@ -118,6 +119,9 @@ func (s *Summary) collect() *Summary{ s.NetWork = append(s.NetWork, info) } s.lastNetWork = nv - s.Streams = Streams.ToList() + s.Streams = nil + Streams.Range(func(ss *Stream) { + s.Streams = append(s.Streams, ss.Summary()) + }) return s } diff --git a/track/audio.go b/track/audio.go index 7312596..60742be 100644 --- a/track/audio.go +++ b/track/audio.go @@ -3,7 +3,6 @@ package track import ( "net" - "go.uber.org/zap" "m7s.live/engine/v4/codec" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" @@ -88,59 +87,3 @@ func (a *Audio) Flush() { } a.Media.Flush() } - -type UnknowAudio struct { - Base - AudioTrack -} - -func (ua *UnknowAudio) GetName() string { - return ua.Base.GetName() -} - -func (ua *UnknowAudio) Flush() { - ua.AudioTrack.Flush() -} - -func (ua *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) { - if ua.AudioTrack == nil { - codecID := frame.AudioCodecID() - switch codecID { - case codec.CodecID_AAC: - if !frame.IsSequence() || len(frame) < 4 { - return - } - if ua.Name == "" { - ua.Name = codecID.String() - } - a := NewAAC(ua.Stream) - ua.AudioTrack = a - a.SampleSize = 16 - a.AVCCHead = []byte{frame[0], 1} - a.WriteAVCC(0, frame) - case codec.CodecID_PCMA, - codec.CodecID_PCMU: - if ua.Name == "" { - ua.Name = codecID.String() - } - alaw := true - if codecID == codec.CodecID_PCMU { - alaw = false - } - a := NewG711(ua.Stream, alaw) - ua.AudioTrack = a - a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2]) - a.SampleSize = 16 - if frame[0]&0x02 == 0 { - a.SampleSize = 8 - } - a.Channels = frame[0]&0x01 + 1 - a.AVCCHead = frame[:1] - ua.AudioTrack.WriteAVCC(ts, frame) - default: - ua.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID))) - } - } else { - ua.AudioTrack.WriteAVCC(ts, frame) - } -} diff --git a/track/base.go b/track/base.go index b1f1911..b5dbd53 100644 --- a/track/base.go +++ b/track/base.go @@ -10,37 +10,6 @@ import ( "m7s.live/engine/v4/util" ) -// Base 基础Track类 -type Base struct { - Name string - Stream IStream `json:"-"` - ts time.Time - bytes int - frames int - BPS int - FPS int -} - -func (bt *Base) ComputeBPS(bytes int) { - bt.bytes += bytes - bt.frames++ - if elapse := time.Since(bt.ts).Seconds(); elapse > 1 { - bt.BPS = bt.bytes / int(elapse) - bt.FPS = bt.frames / int(elapse) - bt.bytes = 0 - bt.frames = 0 - bt.ts = time.Now() - } -} -func (bt *Base) GetName() string { - return bt.Name -} - -func (bt *Base) Flush(bf *BaseFrame) { - bt.ComputeBPS(bf.BytesIn) - bf.Timestamp = time.Now() -} - type 流速控制 struct { 起始时间戳 uint32 起始时间 time.Time diff --git a/track/video.go b/track/video.go index 00ea917..9c81e57 100644 --- a/track/video.go +++ b/track/video.go @@ -162,19 +162,6 @@ func (vt *Video) ReadRing() *AVRing[NALUSlice] { return vr } -type UnknowVideo struct { - Base - VideoTrack -} - -func (uv *UnknowVideo) GetName() string { - return uv.Base.GetName() -} - -func (uv *UnknowVideo) Flush() { - uv.VideoTrack.Flush() -} - /* Access Unit的首个nalu是4字节起始码。 这里举个例子说明,用JM可以生成这样一段码流(不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件: @@ -190,32 +177,3 @@ Access Unit的首个nalu是4字节起始码。 I0(slice0)是序列第一帧(I帧)的第一个slice,是当前Access Unit的首个nalu,所以是4字节头。而I0(slice1)表示第一帧的第二个slice,所以是3字节头。P1(slice0) 、P1(slice1)同理。 */ -func (vt *UnknowVideo) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) { - -} - -func (vt *UnknowVideo) WriteAVCC(ts uint32, frame AVCCFrame) { - if vt.VideoTrack == nil { - if frame.IsSequence() { - ts = 0 - codecID := frame.VideoCodecID() - if vt.Name == "" { - vt.Name = codecID.String() - } - switch codecID { - case codec.CodecID_H264: - vt.VideoTrack = NewH264(vt.Stream) - case codec.CodecID_H265: - vt.VideoTrack = NewH265(vt.Stream) - default: - vt.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID))) - return - } - vt.VideoTrack.WriteAVCC(ts, frame) - } else { - vt.Stream.Warn("need sequence frame") - } - } else { - vt.VideoTrack.WriteAVCC(ts, frame) - } -}