去掉unknowTrack

This commit is contained in:
dexter
2022-05-20 14:17:05 +08:00
parent 48b0fc9768
commit dbf0e0b070
12 changed files with 160 additions and 162 deletions

View File

@@ -6,8 +6,40 @@ import (
"github.com/pion/rtp"
)
// Base 基础Track类
type Base struct {
Name string
Stream IStream `json:"-"`
ts time.Time
bytes int
frames int
BPS int
FPS int
}
func (bt *Base) ComputeBPS(bytes int) {
bt.bytes += bytes
bt.frames++
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
bt.BPS = bt.bytes / int(elapse)
bt.FPS = bt.frames / int(elapse)
bt.bytes = 0
bt.frames = 0
bt.ts = time.Now()
}
}
func (bt *Base) GetBase() *Base {
return bt
}
func (bt *Base) Flush(bf *BaseFrame) {
bt.ComputeBPS(bf.BytesIn)
bf.Timestamp = time.Now()
}
type Track interface {
GetName() string
GetBase() *Base
LastWriteTime() time.Time
}
@@ -37,4 +69,3 @@ type AudioTrack interface {
WriteSlice(AudioSlice)
WriteADTS([]byte)
}

View File

@@ -88,7 +88,7 @@ func (config Config) Unmarshal(s any) {
} else {
switch fv.Type().Kind() {
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
fv.SetUint(value.Uint())
fv.SetUint(uint64(value.Int()))
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
fv.SetInt(value.Int())
case reflect.Float32, reflect.Float64:

View File

@@ -2,6 +2,7 @@ package config
import (
"io"
"net/http"
"os"
"path"
"path/filepath"
@@ -27,10 +28,15 @@ type Record struct {
AutoRecord bool
Filter string
filterReg *regexp.Regexp
fs http.Handler
CreateFileFn func(filename string, append bool) (FileWr, error) `yaml:"-"`
GetDurationFn func(file io.ReadSeeker) uint32 `yaml:"-"`
}
func (r *Record) ServeHTTP (w http.ResponseWriter, req *http.Request) {
r.fs.ServeHTTP(w, req)
}
func (r *Record) NeedRecord(streamPath string) bool {
return r.AutoRecord && (r.filterReg == nil || r.filterReg.MatchString(streamPath))
}
@@ -40,6 +46,7 @@ func (r *Record) Init() {
if r.Filter != "" {
r.filterReg = regexp.MustCompile(r.Filter)
}
r.fs = http.FileServer(http.Dir(r.Path))
r.CreateFileFn = func(filename string, append bool) (file FileWr, err error) {
filePath := filepath.Join(r.Path, filename)
flag := os.O_CREATE

12
http.go
View File

@@ -21,6 +21,18 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) {
util.ReturnJson(summary.collect, time.Second, rw, r)
}
func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request) {
if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" {
if s := Streams.Get(streamPath); s != nil {
json.NewEncoder(rw).Encode(s)
} else {
http.Error(rw, "no such stream", http.StatusNotFound)
}
} else {
http.Error(rw, "no streamPath", http.StatusBadRequest)
}
}
func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(&struct {
Version string

9
io.go
View File

@@ -21,6 +21,7 @@ type ClientConfig interface {
config.Pull | config.Push
}
// 发布者或者订阅者的共用结构体
type IO[C IOConfig, S IIO] struct {
ID string
Type string
@@ -150,7 +151,7 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error {
if create {
EventBus <- s // 通知发布者按需拉流
}
EventBus <- specific // 全局广播订阅事件
EventBus <- specific // 全局广播订阅事件
defer func() {
if err == nil {
specific.OnEvent(specific)
@@ -162,15 +163,15 @@ func (io *IO[C, S]) receive(streamPath string, specific S, conf *C) error {
}
return StreamIsClosedErr
}
type Client[C ClientConfig] struct {
// ClientIO 作为Client角色(PullerPusher)的公共结构体
type ClientIO[C ClientConfig] struct {
Config *C
StreamPath string // 本地流标识
RemoteURL string // 远程服务器地址(用于推拉)
ReConnectCount int //重连次数
}
func (c *Client[C]) init(streamPath string, url string, conf *C) {
func (c *ClientIO[C]) init(streamPath string, url string, conf *C) {
c.Config = conf
c.StreamPath = streamPath
c.RemoteURL = url

View File

@@ -4,6 +4,7 @@ import (
"io"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
@@ -42,8 +43,7 @@ func (p *Publisher) OnEvent(event any) {
switch v := event.(type) {
case IPublisher:
if p.Equal(v) { //第一任
p.AudioTrack = p.Stream.NewAudioTrack()
p.VideoTrack = p.Stream.NewVideoTrack()
} else { // 使用前任的track因为订阅者都挂在前任的上面
p.AudioTrack = v.getAudioTrack()
p.VideoTrack = v.getVideoTrack()
@@ -53,6 +53,66 @@ func (p *Publisher) OnEvent(event any) {
}
}
func (p *Publisher) WriteAVCCVideo(ts uint32, frame common.AVCCFrame) {
if p.VideoTrack == nil {
if frame.IsSequence() {
ts = 0
codecID := frame.VideoCodecID()
switch codecID {
case codec.CodecID_H264:
p.VideoTrack = track.NewH264(p.Stream)
case codec.CodecID_H265:
p.VideoTrack = track.NewH265(p.Stream)
default:
p.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID)))
return
}
p.VideoTrack.WriteAVCC(ts, frame)
} else {
p.Stream.Warn("need sequence frame")
}
} else {
p.VideoTrack.WriteAVCC(ts, frame)
}
}
func (p *Publisher) WriteAVCCAudio(ts uint32, frame common.AVCCFrame) {
if p.AudioTrack == nil {
codecID := frame.AudioCodecID()
switch codecID {
case codec.CodecID_AAC:
if !frame.IsSequence() || len(frame) < 4 {
return
}
a := track.NewAAC(p.Stream)
p.AudioTrack = a
a.SampleSize = 16
a.AVCCHead = []byte{frame[0], 1}
a.WriteAVCC(0, frame)
case codec.CodecID_PCMA,
codec.CodecID_PCMU:
alaw := true
if codecID == codec.CodecID_PCMU {
alaw = false
}
a := track.NewG711(p.Stream, alaw)
p.AudioTrack = a
a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2])
a.SampleSize = 16
if frame[0]&0x02 == 0 {
a.SampleSize = 8
}
a.Channels = frame[0]&0x01 + 1
a.AVCCHead = frame[:1]
p.AudioTrack.WriteAVCC(ts, frame)
default:
p.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID)))
}
} else {
p.AudioTrack.WriteAVCC(ts, frame)
}
}
type IPuller interface {
IPublisher
Connect() error
@@ -63,7 +123,7 @@ type IPuller interface {
// 用于远程拉流的发布者
type Puller struct {
Client[config.Pull]
ClientIO[config.Pull]
}
// 是否需要重连

View File

@@ -137,6 +137,32 @@ type Stream struct {
AppName string
StreamName string
}
type StreamSummay struct {
Path string
State StreamState
Subscribers int
Tracks []string
StartTime int64
Type string
BPS int
}
// Summary 返回流的简要信息
func (s *Stream) Summary() (r StreamSummay) {
if s.Publisher != nil {
r.Type = s.Publisher.GetIO().Type
}
//TODO: Lock
for _, t := range s.Tracks {
r.BPS += t.GetBase().BPS
r.Tracks = append(r.Tracks, t.GetBase().Name)
}
r.Path = s.Path
r.State = s.State
r.Subscribers = len(s.Subscribers)
r.StartTime = s.StartTime.Unix()
return
}
func (s *Stream) SSRC() uint32 {
return uint32(uintptr(unsafe.Pointer(s)))
@@ -329,14 +355,14 @@ func (s *Stream) run() {
s.action(ACTION_FIRSTENTER)
}
case Track:
name := v.GetName()
name := v.GetBase().Name
if _, ok := s.Tracks[name]; !ok {
s.Tracks[name] = v
s.Info("track +1", zap.String("name", name))
s.broadcast(v)
}
case TrackRemoved:
name := v.GetName()
name := v.GetBase().Name
if t, ok := s.Tracks[name]; ok {
s.Info("track -1", zap.String("name", name))
delete(s.Tracks, name)
@@ -380,19 +406,6 @@ func (s *Stream) RemoveTrack(t Track) {
s.Receive(TrackRemoved{t})
}
// 如果暂时不知道编码格式可以用这个
func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo) {
vt = &track.UnknowVideo{}
vt.Stream = r
return
}
func (r *Stream) NewAudioTrack() (at *track.UnknowAudio) {
at = &track.UnknowAudio{}
at.Stream = r
return
}
func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data) {
dt = &track.Data{
Locker: locker,

View File

@@ -138,7 +138,7 @@ func (s *Subscriber) AddTrack(t Track) bool {
default:
return false
}
s.Info("track+1", zap.String("name", t.GetName()))
s.Info("track+1", zap.String("name", t.GetBase().Name))
return true
}
@@ -335,7 +335,7 @@ type IPusher interface {
Reconnect() bool
}
type Pusher struct {
Client[config.Push]
ClientIO[config.Push]
}
// 是否需要重连

View File

@@ -37,7 +37,7 @@ type Summary struct {
Usage float64
}
NetWork []NetWorkInfo
Streams []*Stream
Streams []StreamSummay
lastNetWork []net.IOCountersStat
ref int32
}
@@ -62,6 +62,7 @@ func (s *Summary) Start() {
func (s *Summary) Point() *Summary {
return s
}
// Running 是否正在采集数据
func (s *Summary) Running() bool {
return s.ref > 0
@@ -87,7 +88,7 @@ func (s *Summary) Report(slave *Summary) {
children.Set(slave.Address, slave)
}
func (s *Summary) collect() *Summary{
func (s *Summary) collect() *Summary {
v, _ := mem.VirtualMemory()
d, _ := disk.Usage("/")
nv, _ := net.IOCounters(true)
@@ -107,9 +108,9 @@ func (s *Summary) collect() *Summary{
s.NetWork = []NetWorkInfo{}
for i, n := range nv {
info := NetWorkInfo{
Name: n.Name,
Name: n.Name,
Receive: n.BytesRecv,
Sent: n.BytesSent,
Sent: n.BytesSent,
}
if s.lastNetWork != nil && len(s.lastNetWork) > i {
info.ReceiveSpeed = n.BytesRecv - s.lastNetWork[i].BytesRecv
@@ -118,6 +119,9 @@ func (s *Summary) collect() *Summary{
s.NetWork = append(s.NetWork, info)
}
s.lastNetWork = nv
s.Streams = Streams.ToList()
s.Streams = nil
Streams.Range(func(ss *Stream) {
s.Streams = append(s.Streams, ss.Summary())
})
return s
}

View File

@@ -3,7 +3,6 @@ package track
import (
"net"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
@@ -88,59 +87,3 @@ func (a *Audio) Flush() {
}
a.Media.Flush()
}
type UnknowAudio struct {
Base
AudioTrack
}
func (ua *UnknowAudio) GetName() string {
return ua.Base.GetName()
}
func (ua *UnknowAudio) Flush() {
ua.AudioTrack.Flush()
}
func (ua *UnknowAudio) WriteAVCC(ts uint32, frame AVCCFrame) {
if ua.AudioTrack == nil {
codecID := frame.AudioCodecID()
switch codecID {
case codec.CodecID_AAC:
if !frame.IsSequence() || len(frame) < 4 {
return
}
if ua.Name == "" {
ua.Name = codecID.String()
}
a := NewAAC(ua.Stream)
ua.AudioTrack = a
a.SampleSize = 16
a.AVCCHead = []byte{frame[0], 1}
a.WriteAVCC(0, frame)
case codec.CodecID_PCMA,
codec.CodecID_PCMU:
if ua.Name == "" {
ua.Name = codecID.String()
}
alaw := true
if codecID == codec.CodecID_PCMU {
alaw = false
}
a := NewG711(ua.Stream, alaw)
ua.AudioTrack = a
a.SampleRate = uint32(codec.SoundRate[(frame[0]&0x0c)>>2])
a.SampleSize = 16
if frame[0]&0x02 == 0 {
a.SampleSize = 8
}
a.Channels = frame[0]&0x01 + 1
a.AVCCHead = frame[:1]
ua.AudioTrack.WriteAVCC(ts, frame)
default:
ua.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codecID)))
}
} else {
ua.AudioTrack.WriteAVCC(ts, frame)
}
}

View File

@@ -10,37 +10,6 @@ import (
"m7s.live/engine/v4/util"
)
// Base 基础Track类
type Base struct {
Name string
Stream IStream `json:"-"`
ts time.Time
bytes int
frames int
BPS int
FPS int
}
func (bt *Base) ComputeBPS(bytes int) {
bt.bytes += bytes
bt.frames++
if elapse := time.Since(bt.ts).Seconds(); elapse > 1 {
bt.BPS = bt.bytes / int(elapse)
bt.FPS = bt.frames / int(elapse)
bt.bytes = 0
bt.frames = 0
bt.ts = time.Now()
}
}
func (bt *Base) GetName() string {
return bt.Name
}
func (bt *Base) Flush(bf *BaseFrame) {
bt.ComputeBPS(bf.BytesIn)
bf.Timestamp = time.Now()
}
type 流速控制 struct {
起始时间戳 uint32
起始时间 time.Time

View File

@@ -162,19 +162,6 @@ func (vt *Video) ReadRing() *AVRing[NALUSlice] {
return vr
}
type UnknowVideo struct {
Base
VideoTrack
}
func (uv *UnknowVideo) GetName() string {
return uv.Base.GetName()
}
func (uv *UnknowVideo) Flush() {
uv.VideoTrack.Flush()
}
/*
Access Unit的首个nalu是4字节起始码。
这里举个例子说明用JM可以生成这样一段码流不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件:
@@ -190,32 +177,3 @@ Access Unit的首个nalu是4字节起始码。
I0(slice0)是序列第一帧I帧的第一个slice是当前Access Unit的首个nalu所以是4字节头。而I0(slice1)表示第一帧的第二个slice所以是3字节头。P1(slice0) 、P1(slice1)同理。
*/
func (vt *UnknowVideo) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
}
func (vt *UnknowVideo) WriteAVCC(ts uint32, frame AVCCFrame) {
if vt.VideoTrack == nil {
if frame.IsSequence() {
ts = 0
codecID := frame.VideoCodecID()
if vt.Name == "" {
vt.Name = codecID.String()
}
switch codecID {
case codec.CodecID_H264:
vt.VideoTrack = NewH264(vt.Stream)
case codec.CodecID_H265:
vt.VideoTrack = NewH265(vt.Stream)
default:
vt.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID)))
return
}
vt.VideoTrack.WriteAVCC(ts, frame)
} else {
vt.Stream.Warn("need sequence frame")
}
} else {
vt.VideoTrack.WriteAVCC(ts, frame)
}
}