diff --git a/README.md b/README.md index e1d029e..9f16394 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ - 获取engine信息 `/api/sysInfo` 返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]} - 获取系统基本情况 `/api/summary` 返回值Summary数据 - 获取所有插件信息 `/api/plugins` 返回值Plugin数据 +- 读取mp4文件再次发布为视频流 `/api/replay/mp4?streamPath=xxx&dump=filepath` filepath是文件路径 +- 读取ts文件再次发布为视频流 `/api/replay/ts?streamPath=xxx&dump=filepath` filepath是文件路径 - 获取指定的配置信息 `/api/getconfig?name=xxx` 返回xxx插件的配置信息,如果不带参数或参数为空则返回全局配置 - 修改并保存配置信息 `/api/modifyconfig?name=xxx&yaml=1` 修改xxx插件的配置信息,在请求的body中传入修改后的配置yaml字符串 - 热更新配置信息 `/api/updateconfig?name=xxx` 热更新xxx插件的配置信息,如果不带参数或参数为空则热更新全局配置 diff --git a/config/types.go b/config/types.go index ce39a37..f7cac40 100755 --- a/config/types.go +++ b/config/types.go @@ -62,6 +62,7 @@ type Subscribe struct { Key string // 订阅鉴权key SecretArgName string `default:"secret"` // 订阅鉴权参数名 ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名 + Internal bool `default:"false"` // 是否内部订阅 } func (c *Subscribe) GetSubscribeConfig() *Subscribe { diff --git a/go.mod b/go.mod index 9bdd858..72aeeae 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/quic-go/qtls-go1-20 v0.1.0 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect - github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 + github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 github.com/yusufpapurcu/wmi v1.2.2 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect diff --git a/go.sum b/go.sum index 0ca92ae..6a11735 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,8 @@ github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYm github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33 h1:uyZY++dluUg7iTSsNzuOVln/mC2U2KXwgKLfKLCJ74Y= github.com/yapingcat/gomedia v0.0.0-20230222121919-c67df405bf33/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc= +github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274 h1:cj4I+bvWX9I+Hg6tnZ7DAiOVxzhyLhdvYVKp+WpM/2c= +github.com/yapingcat/gomedia v0.0.0-20230426092936-387031404274/go.mod h1:WSZ59bidJOO40JSJmLqlkBJrjZCtjbKKkygEMfzY/kc= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/http.go b/http.go index 866428f..9a2d605 100644 --- a/http.go +++ b/http.go @@ -22,6 +22,11 @@ type GlobalConfig struct { config.Engine } +func ShouldYaml(r *http.Request) bool { + format := r.URL.Query().Get("format") + return r.URL.Query().Get("yaml") != "" || format == "yaml" +} + func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.Write([]byte("Monibuca API Server\n")) for _, api := range apiList { @@ -34,11 +39,11 @@ func fetchSummary() *Summary { } func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { - format := r.URL.Query().Get("format") + y := ShouldYaml(r) if r.Header.Get("Accept") == "text/event-stream" { summary.Add() defer summary.Done() - if format == "yaml" { + if y { util.ReturnYaml(fetchSummary, time.Second, rw, r) } else { util.ReturnJson(fetchSummary, time.Second, rw, r) @@ -47,7 +52,7 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { if !summary.Running() { summary.collect() } - if format == "yaml" { + if y { if err := yaml.NewEncoder(rw).Encode(&summary); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) } @@ -71,7 +76,11 @@ func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request) { func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request) { if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" { if s := Streams.Get(streamPath); s != nil { - util.ReturnJson(func() *Stream { return s }, time.Second, rw, r) + if ShouldYaml(r) { + util.ReturnYaml(func() *Stream { return s }, time.Second, rw, r) + } else { + util.ReturnJson(func() *Stream { return s }, time.Second, rw, r) + } } else { http.Error(rw, NO_SUCH_STREAM, http.StatusNotFound) } @@ -113,7 +122,7 @@ func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request) } else { p = Engine } - if q.Get("yaml") != "" { + if ShouldYaml(r) { mm, err := yaml.Marshal(p.RawConfig) if err != nil { mm = []byte("") @@ -145,7 +154,7 @@ func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Reques } else { p = Engine } - if q.Has("yaml") { + if ShouldYaml(r) { err = yaml.NewDecoder(r.Body).Decode(&p.Modified) } else { err = json.NewDecoder(r.Body).Decode(&p.Modified) @@ -346,9 +355,9 @@ func (c *GlobalConfig) API_replay_ps(w http.ResponseWriter, r *http.Request) { pub.SetIO(f) if err = Engine.Publish(streamPath, &pub); err == nil { go pub.Replay(f) - w.Write([]byte("ok")) + w.Write([]byte("ok")) } else { http.Error(w, err.Error(), http.StatusInternalServerError) } } -} \ No newline at end of file +} diff --git a/io.go b/io.go index 284a0bc..7f5c10e 100644 --- a/io.go +++ b/io.go @@ -129,6 +129,20 @@ var ( OnAuthPub func(p *util.Promise[IPublisher]) error ) +func (io *IO) auth(key string, secret string, expire string) bool { + if unixTime, err := strconv.ParseInt(expire, 16, 64); err != nil || time.Now().Unix() > unixTime { + return false + } + trueSecret := md5.Sum([]byte(key + io.Stream.Path + expire)) + for i := 0; i < 16; i++ { + hex, err := strconv.ParseInt(secret[i<<1:(i<<1)+2], 16, 16) + if trueSecret[i] != byte(hex) || err != nil { + return false + } + } + return true +} + // receive 用于接收发布或者订阅 func (io *IO) receive(streamPath string, specific IIO) error { streamPath = strings.Trim(streamPath, "/") @@ -206,13 +220,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { return err } } else if conf.Key != "" { - secret := io.Args.Get(conf.SecretArgName) - t := io.Args.Get(conf.ExpireArgName) - if unixTime, err := strconv.ParseInt(t, 16, 64); err != nil || time.Now().Unix() > unixTime { - return ErrAuth - } - trueSecret := md5.Sum([]byte(conf.Key + s.StreamName + t)) - if string(trueSecret[:]) != secret { + if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) { return ErrAuth } } @@ -247,13 +255,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { return err } } else if conf := specific.(ISubscriber).GetSubscriber().Config; conf.Key != "" { - secret := io.Args.Get(conf.SecretArgName) - t := io.Args.Get(conf.ExpireArgName) - if unixTime, err := strconv.ParseInt(t, 16, 64); err != nil || time.Now().Unix() > unixTime { - return ErrAuth - } - trueSecret := md5.Sum([]byte(conf.Key + s.StreamName + t)) - if string(trueSecret[:]) != secret { + if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) { return ErrAuth } } diff --git a/lang/zh.yaml b/lang/zh.yaml index 20369d1..593db3f 100644 --- a/lang/zh.yaml +++ b/lang/zh.yaml @@ -48,6 +48,7 @@ reamins: 剩余 "first frame read": 第一帧已读取 "fu have no start": rtp的FU起始包丢了 "disabled by env": 被环境变量禁用 +"event cost too much time": 事件处理耗时过长 firstTs: 第一帧时间戳 firstSeq: 第一帧序列号 skipSeq: 跳过序列号 diff --git a/publisher-mp4.go b/publisher-mp4.go index e8a9ef4..2d1ca7b 100644 --- a/publisher-mp4.go +++ b/publisher-mp4.go @@ -50,11 +50,11 @@ func (p *MP4Publisher) ReadMP4Data(source io.ReadSeeker) error { } switch pkg.Cid { case mp4.MP4_CODEC_H264, mp4.MP4_CODEC_H265: - p.VideoTrack.WriteAnnexB(uint32(pkg.Pts), uint32(pkg.Dts), pkg.Data) + p.VideoTrack.WriteAnnexB(uint32(pkg.Pts*90), uint32(pkg.Dts*90), pkg.Data) case mp4.MP4_CODEC_AAC: - p.AudioTrack.WriteADTS(uint32(pkg.Pts), pkg.Data) + p.AudioTrack.WriteADTS(uint32(pkg.Pts*90), pkg.Data) case mp4.MP4_CODEC_G711A, mp4.MP4_CODEC_G711U: - p.AudioTrack.WriteRaw(uint32(pkg.Pts), pkg.Data) + p.AudioTrack.WriteRaw(uint32(pkg.Pts*90), pkg.Data) } } } diff --git a/stream.go b/stream.go index f4bfd13..f6ef56f 100644 --- a/stream.go +++ b/stream.go @@ -416,9 +416,6 @@ func (s *Stream) run() { s.Subscribers.Broadcast(event) } }) - if s.State != STATE_PUBLISHING { - continue - } if s.Tracks.Len() == 0 || (s.Publisher != nil && s.Publisher.IsClosed()) { s.action(ACTION_PUBLISHLOST) } else { @@ -530,6 +527,8 @@ func (s *Stream) run() { if _, ok := v.Value.(*track.Audio); ok && !s.GetPublisherConfig().PubVideo { s.Subscribers.AbortWait() } + // 这里重置的目的是当PublishTimeout设置很大的情况下,需要及时取消订阅者的等待 + s.timeout.Reset(time.Second * 5) } else { v.Reject(ErrBadTrackName) } diff --git a/subscriber.go b/subscriber.go index f5cb018..200b285 100644 --- a/subscriber.go +++ b/subscriber.go @@ -115,7 +115,6 @@ type TrackPlayer struct { // Subscriber 订阅者实体定义 type Subscriber struct { IO - IsInternal bool //是否内部订阅,不放入订阅列表 Config *config.Subscribe TrackPlayer `json:"-" yaml:"-"` } @@ -384,7 +383,7 @@ func (s *Subscriber) PlayBlock(subType byte) { func (s *Subscriber) onStop() { if !s.Stream.IsClosed() { s.Info("stop") - if !s.IsInternal { + if !s.Config.Internal { s.Stream.Receive(s.Spesific) } } diff --git a/subscribers.go b/subscribers.go index 8e4d5f2..96dc5af 100644 --- a/subscribers.go +++ b/subscribers.go @@ -105,7 +105,7 @@ func (s *Subscribers) Delete(suber ISubscriber) { func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks) { io := suber.GetSubscriber() - if io.IsInternal { + if io.Config.Internal { s.internal[suber] = wait io.Info("innersuber +1", zap.Int("remains", len(s.internal))) } else { diff --git a/track/audio.go b/track/audio.go index be61e29..8b3730a 100644 --- a/track/audio.go +++ b/track/audio.go @@ -85,8 +85,8 @@ func (a *Audio) CompleteRTP(value *AVFrame) { } func (a *Audio) Narrow() { - if a.HistoryRing == nil && a.IDRing != nil { - a.narrow(int(a.Value.Sequence - a.IDRing.Value.Sequence)) - } + // if a.HistoryRing == nil && a.IDRing != nil { + // a.narrow(int(a.Value.Sequence - a.IDRing.Value.Sequence)) + // } a.AddIDR() }