feat: add speed play

This commit is contained in:
langhuihui
2024-04-12 17:48:26 +08:00
parent 51c85d10f7
commit d06d068f99
10 changed files with 196 additions and 51 deletions

View File

@@ -0,0 +1,10 @@
global:
loglevel: debug
tcp:
listenaddr: :50051
hdl:
publish:
speed: 2
pull:
pullonstart:
live/test: /Users/dexter/Movies/jb-demo.flv

15
example/readflv/main.go Normal file
View File

@@ -0,0 +1,15 @@
package main
import (
"context"
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/rtmp"
)
func main() {
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
m7s.Run(context.Background(), "config.yaml")
}

View File

@@ -33,6 +33,7 @@ type Publish struct {
PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时
BufferTime time.Duration `desc:"缓冲长度(单位:秒)0代表取最近关键帧"` // 缓冲长度(单位:秒)0代表取最近关键帧
SpeedLimit time.Duration `default:"500ms" desc:"速度限制最大等待时间,0则不等待"` //速度限制最大等待时间
Speed float64 `default:"1" desc:"倍速"` // 倍速
Key string `desc:"发布鉴权key"` // 发布鉴权key
SecretArgName string `default:"secret" desc:"发布鉴权参数名"` // 发布鉴权参数名
ExpireArgName string `default:"expire" desc:"发布鉴权失效时间参数名"` // 发布鉴权失效时间参数名

75
pkg/util/collection.go Normal file
View File

@@ -0,0 +1,75 @@
package util
import "slices"
type Collection[K comparable, T interface{ GetKey() K }] struct {
Items []T
m map[K]T
Length int
}
func (c *Collection[K, T]) Add(item T) {
c.Items = append(c.Items, item)
if c.Length > 100 || c.m != nil {
if c.m == nil {
c.m = make(map[K]T)
for _, v := range c.Items {
c.m[v.GetKey()] = v
}
}
c.m[item.GetKey()] = item
}
c.Length++
}
func (c *Collection[K, T]) AddUnique(item T) {
if _, ok := c.Get(item.GetKey()); !ok {
c.Add(item)
}
}
func (c *Collection[K, T]) Set(item T) {
key := item.GetKey()
if c.m != nil {
c.m[key] = item
}
for i := range c.Items {
if c.Items[i].GetKey() == key {
c.Items[i] = item
return
}
}
c.Add(item)
}
func (c *Collection[K, T]) Remove(item T) {
c.RemoveByKey(item.GetKey())
}
func (c *Collection[K, T]) RemoveByKey(key K) {
delete(c.m, key)
for i := range c.Length {
if c.Items[i].GetKey() == key {
c.Items = slices.Delete(c.Items, i, i+1)
break
}
}
c.Length--
}
func (c *Collection[K, T]) Get(key K) (item T, ok bool) {
if c.m != nil {
item, ok = c.m[key]
return item, ok
}
for _, item := range c.Items {
if item.GetKey() == key {
return item, true
}
}
return
}
func (c *Collection[K, T]) GetKey() K {
return c.Items[0].GetKey()
}

View File

@@ -1,5 +1,7 @@
package util
import "encoding/json"
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
@@ -234,3 +236,19 @@ func (l *List[T]) PushFrontList(other *List[T]) {
l.insertValue(e.Value, &l.root)
}
}
// MarshalJSON returns the JSON encoding of the list.
func (l *List[T]) MarshalJSON() ([]byte, error) {
// Create a slice to hold the JSON-encoded elements
elements := make([]T, l.Len())
// Iterate over the list and populate the slice
i := 0
for e := l.Front(); e != nil; e = e.Next() {
elements[i] = e.Value
i++
}
// Marshal the slice to JSON
return json.Marshal(elements)
}

View File

@@ -17,6 +17,12 @@ type HDLPlugin struct {
m7s.Plugin
}
func (p *HDLPlugin) OnInit() {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.Pull(streamPath, url, NewHDLPuller())
}
}
var _ = m7s.InstallPlugin[HDLPlugin]()
func (p *HDLPlugin) WriteFlvHeader(sub *m7s.Subscriber) (flv net.Buffers) {

View File

@@ -20,6 +20,12 @@ type HDLPuller struct {
pool *util.ScalableMemoryAllocator
}
func NewHDLPuller() *HDLPuller {
return &HDLPuller{
pool: util.NewScalableMemoryAllocator(1024),
}
}
func (puller *HDLPuller) Connect(p *m7s.Puller) (err error) {
if strings.HasPrefix(p.RemoteURL, "http") {
var res *http.Response
@@ -88,8 +94,10 @@ func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) {
mem := frame.Malloc(int(dataSize))
_, err = io.ReadFull(puller, mem)
if err != nil {
frame.Recycle()
return
}
frame.ReadFromBytes(mem)
puller.absTS = offsetTs + (timestamp - startTs)
frame.Timestamp = puller.absTS
// fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS)

View File

@@ -19,10 +19,31 @@ const (
PublisherStateWaitSubscriber
)
type SpeedControl struct {
speed float64
beginTime time.Time
beginTimestamp time.Duration
}
func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
if speed != s.speed {
s.speed = speed
s.beginTime = time.Now()
s.beginTimestamp = ts
} else {
elapsed := time.Since(s.beginTime)
should := time.Duration(float64(ts) / speed)
if should > elapsed {
time.Sleep(should - elapsed)
}
}
}
type Publisher struct {
PubSubBase
sync.RWMutex `json:"-" yaml:"-"`
config.Publish
SpeedControl
State PublisherState
VideoTrack *AVTrack
AudioTrack *AVTrack
@@ -34,6 +55,10 @@ type Publisher struct {
lastTs time.Duration
}
func (p *Publisher) GetKey() string {
return p.StreamPath
}
func (p *Publisher) timeout() (err error) {
switch p.State {
case PublisherStateInit:
@@ -113,6 +138,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
p.lastTs = frame.Timestamp
p.Debug("write", "seq", frame.Sequence)
t.Step()
p.speedControl(p.Publish.Speed, p.lastTs)
}
func (p *Publisher) WriteVideo(data IAVFrame) (err error) {

View File

@@ -47,12 +47,10 @@ type Server struct {
ID int
eventChan chan any
Plugins []*Plugin
Streams map[string]*Publisher
Pulls map[string]*Puller
Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller]
Waiting map[string][]*Subscriber
Publishers []*Publisher
Subscribers []*Subscriber
Pullers []*Puller
Subscribers util.Collection[int, *Subscriber]
pidG int
sidG int
apiList []string
@@ -62,8 +60,6 @@ type Server struct {
func NewServer() (s *Server) {
s = &Server{
ID: int(serverIndexG.Add(1)),
Streams: make(map[string]*Publisher),
Pulls: make(map[string]*Puller),
Waiting: make(map[string][]*Subscriber),
eventChan: make(chan any, 10),
}
@@ -86,8 +82,6 @@ type rawconfig = map[string]map[string]any
func (s *Server) reset() {
server := Server{
ID: s.ID,
Streams: make(map[string]*Publisher),
Pulls: make(map[string]*Puller),
Waiting: make(map[string][]*Subscriber),
eventChan: make(chan any, 10),
}
@@ -191,10 +185,10 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
s.eventLoop()
err = context.Cause(s)
s.Warn("Server is done", "reason", err)
for _, publisher := range s.Publishers {
for _, publisher := range s.Streams.Items {
publisher.Stop(err)
}
for _, subscriber := range s.Subscribers {
for _, subscriber := range s.Subscribers.Items {
subscriber.Stop(err)
}
for _, p := range s.Plugins {
@@ -210,7 +204,7 @@ func (s *Server) eventLoop() {
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.Done())}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(pulse.C)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.eventChan)}}
var pubCount, subCount int
addPublisher := func(publisher *Publisher) {
if nl := len(s.Publishers); nl > pubCount {
if nl := s.Streams.Length; nl > pubCount {
pubCount = nl
if subCount == 0 {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(publisher.Done())})
@@ -224,7 +218,7 @@ func (s *Server) eventLoop() {
case 0:
return
case 1:
for _, publisher := range s.Streams {
for _, publisher := range s.Streams.Items {
if err := publisher.checkTimeout(); err != nil {
publisher.Stop(err)
}
@@ -241,19 +235,24 @@ func (s *Server) eventLoop() {
case 2:
event := rev.Interface()
switch v := event.(type) {
case *util.Promise[*Publisher]:
err := s.OnPublish(v.Value)
case *util.Promise[any]:
switch vv := v.Value.(type) {
case *Publisher:
err := s.OnPublish(vv)
if v.Fulfill(err); err != nil {
continue
}
event = v.Value
addPublisher(v.Value)
event = vv
addPublisher(vv)
}
case *util.Promise[*Publisher]:
case *util.Promise[*Subscriber]:
err := s.OnSubscribe(v.Value)
if v.Fulfill(err); err != nil {
continue
}
if nl := len(s.Subscribers); nl > subCount {
if nl := s.Subscribers.Length; nl > subCount {
subCount = nl
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())})
}
@@ -262,7 +261,7 @@ func (s *Server) eventLoop() {
}
event = v.Value
case *util.Promise[*Puller]:
if _, ok := s.Pulls[v.Value.StreamPath]; ok {
if _, ok := s.Pulls.Get(v.Value.StreamPath); ok {
v.Fulfill(ErrStreamExist)
continue
} else {
@@ -271,20 +270,16 @@ func (s *Server) eventLoop() {
if err != nil {
continue
}
s.Pulls[v.Value.StreamPath] = v.Value
s.Pullers = append(s.Pullers, v.Value)
s.Pulls.Add(v.Value)
addPublisher(&v.Value.Publisher)
event = v.Value
}
case *util.Promise[*StreamSnapShot]:
v.Value.Publisher = s.Streams[v.Value.StreamPath]
v.Value.Publisher, _ = s.Streams.Get(v.Value.StreamPath)
v.Fulfill(nil)
continue
case *util.Promise[*pb.StopSubscribeRequest]:
if index := slices.IndexFunc(s.Subscribers, func(s *Subscriber) bool {
return s.ID == int(v.Value.Id)
}); index >= 0 {
subscriber := s.Subscribers[index]
if subscriber, ok := s.Subscribers.Get(int(v.Value.Id)); ok {
subscriber.Stop(errors.New("stop by api"))
v.Fulfill(nil)
} else {
@@ -300,14 +295,12 @@ func (s *Server) eventLoop() {
}
default:
if subStart, pubIndex := 3+pubCount, chosen-3; chosen < subStart {
s.onUnpublish(s.Publishers[pubIndex])
s.onUnpublish(s.Streams.Items[pubIndex])
pubCount--
s.Publishers = slices.Delete(s.Publishers, pubIndex, pubIndex+1)
} else {
i := chosen - subStart
s.onUnsubscribe(s.Subscribers[i])
s.onUnsubscribe(s.Subscribers.Items[i])
subCount--
s.Subscribers = slices.Delete(s.Subscribers, i, i+1)
}
cases = slices.Delete(cases, chosen, chosen+1)
}
@@ -315,6 +308,7 @@ func (s *Server) eventLoop() {
}
func (s *Server) onUnsubscribe(subscriber *Subscriber) {
s.Subscribers.Remove(subscriber)
s.Info("unsubscribe", "streamPath", subscriber.StreamPath)
if subscriber.Closer != nil {
subscriber.Close()
@@ -333,8 +327,8 @@ func (s *Server) onUnsubscribe(subscriber *Subscriber) {
}
func (s *Server) onUnpublish(publisher *Publisher) {
delete(s.Streams, publisher.StreamPath)
s.Info("unpublish", "streamPath", publisher.StreamPath, "count", len(s.Streams))
s.Streams.Remove(publisher)
s.Info("unpublish", "streamPath", publisher.StreamPath, "count", s.Streams.Length)
for subscriber := range publisher.Subscribers {
s.Waiting[publisher.StreamPath] = append(s.Waiting[publisher.StreamPath], subscriber)
subscriber.TimeoutTimer.Reset(publisher.WaitCloseTimeout)
@@ -342,15 +336,11 @@ func (s *Server) onUnpublish(publisher *Publisher) {
if publisher.Closer != nil {
publisher.Close()
}
if puller, ok := s.Pulls[publisher.StreamPath]; ok {
delete(s.Pulls, publisher.StreamPath)
index := slices.Index(s.Pullers, puller)
s.Pullers = slices.Delete(s.Pullers, index, index+1)
}
s.Pulls.RemoveByKey(publisher.StreamPath)
}
func (s *Server) OnPublish(publisher *Publisher) error {
if oldPublisher, ok := s.Streams[publisher.StreamPath]; ok {
if oldPublisher, ok := s.Streams.Get(publisher.StreamPath); ok {
if publisher.KickExist {
publisher.Warn("kick")
oldPublisher.Stop(ErrKick)
@@ -363,8 +353,7 @@ func (s *Server) OnPublish(publisher *Publisher) error {
publisher.Subscribers = make(map[*Subscriber]struct{})
publisher.TransTrack = make(map[reflect.Type]*AVTrack)
}
s.Streams[publisher.StreamPath] = publisher
s.Publishers = append(s.Publishers, publisher)
s.Streams.Set(publisher)
s.pidG++
p := publisher.Plugin
publisher.ID = s.pidG
@@ -388,9 +377,9 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error {
subscriber.ID = s.sidG
subscriber.Logger = subscriber.Plugin.With("streamPath", subscriber.StreamPath, "suber", subscriber.ID)
subscriber.TimeoutTimer = time.NewTimer(subscriber.Plugin.config.Subscribe.WaitTimeout)
s.Subscribers = append(s.Subscribers, subscriber)
s.Subscribers.Add(subscriber)
subscriber.Info("subscribe")
if publisher, ok := s.Streams[subscriber.StreamPath]; ok {
if publisher, ok := s.Streams.Get(subscriber.StreamPath); ok {
return publisher.AddSubscriber(subscriber)
} else {
s.Waiting[subscriber.StreamPath] = append(s.Waiting[subscriber.StreamPath], subscriber)

View File

@@ -23,12 +23,9 @@ type PubSubBase struct {
io.Closer `json:"-" yaml:"-"`
}
// func (ps *PubSubBase) Stop(reason error) {
// ps.Unit.Stop(reason)
// if ps.Closer != nil {
// ps.Closer.Close()
// }
// }
func (p *PubSubBase) GetKey() int {
return p.ID
}
func (ps *PubSubBase) Init(p *Plugin, streamPath string, options ...any) {
ps.Plugin = p