mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-06 09:06:52 +08:00
fix: stream close process
This commit is contained in:
@@ -40,10 +40,9 @@ type Publish struct {
|
|||||||
BufferTime time.Duration // 缓冲长度(单位:秒),0代表取最近关键帧
|
BufferTime time.Duration // 缓冲长度(单位:秒),0代表取最近关键帧
|
||||||
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
|
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
|
||||||
Key string // 发布鉴权key
|
Key string // 发布鉴权key
|
||||||
SecretArgName string `default:"secret"` // 发布鉴权参数名
|
SecretArgName string `default:"secret"` // 发布鉴权参数名
|
||||||
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名
|
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名
|
||||||
RingSize int `default:"256"` // 初始缓冲区大小
|
RingSize string `default:"256-1024"` // 初始缓冲区大小
|
||||||
RingSizeMax int `default:"1024"` // 最大缓冲区大小
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Publish) GetPublishConfig() Publish {
|
func (c Publish) GetPublishConfig() Publish {
|
||||||
|
12
stream.go
12
stream.go
@@ -327,16 +327,13 @@ func (r *Stream) action(action StreamAction) (ok bool) {
|
|||||||
case STATE_CLOSED:
|
case STATE_CLOSED:
|
||||||
Streams.Delete(r.Path)
|
Streams.Delete(r.Path)
|
||||||
r.timeout.Stop()
|
r.timeout.Stop()
|
||||||
r.Subscribers.Dispose()
|
|
||||||
for !r.actionChan.Close() {
|
|
||||||
// 等待channel发送完毕,伪自旋锁
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
}
|
|
||||||
stateEvent = SEclose{event}
|
stateEvent = SEclose{event}
|
||||||
r.Subscribers.Broadcast(stateEvent)
|
r.Subscribers.Broadcast(stateEvent)
|
||||||
r.Tracks.Range(func(_ string, t Track) {
|
r.Tracks.Range(func(_ string, t Track) {
|
||||||
t.Dispose()
|
t.Dispose()
|
||||||
})
|
})
|
||||||
|
r.Subscribers.Dispose()
|
||||||
|
r.actionChan.Close()
|
||||||
}
|
}
|
||||||
EventBus <- stateEvent
|
EventBus <- stateEvent
|
||||||
if r.Publisher != nil {
|
if r.Publisher != nil {
|
||||||
@@ -475,6 +472,11 @@ func (s *Stream) run() {
|
|||||||
case action, ok := <-s.actionChan.C:
|
case action, ok := <-s.actionChan.C:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
|
} else if s.State == STATE_CLOSED {
|
||||||
|
if s.actionChan.Close() { //再次尝试关闭
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
timeStart = time.Now()
|
timeStart = time.Now()
|
||||||
switch v := action.(type) {
|
switch v := action.(type) {
|
||||||
|
@@ -153,7 +153,7 @@ func (av *Media) SetStuff(stuff ...any) {
|
|||||||
case IStream:
|
case IStream:
|
||||||
pubConf := v.GetPublisherConfig()
|
pubConf := v.GetPublisherConfig()
|
||||||
av.Base.SetStuff(v)
|
av.Base.SetStuff(v)
|
||||||
av.Init(pubConf.RingSize, NewAVFrame)
|
av.Init(256, NewAVFrame)
|
||||||
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
|
||||||
av.等待上限 = pubConf.SpeedLimit
|
av.等待上限 = pubConf.SpeedLimit
|
||||||
case uint32:
|
case uint32:
|
||||||
|
@@ -20,7 +20,7 @@ const (
|
|||||||
SUBMODE_BUFFER
|
SUBMODE_BUFFER
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrDiscard = errors.New("dsicard")
|
var ErrDiscard = errors.New("discard")
|
||||||
|
|
||||||
type AVRingReader struct {
|
type AVRingReader struct {
|
||||||
RingReader[any, *common.AVFrame]
|
RingReader[any, *common.AVFrame]
|
||||||
|
Reference in New Issue
Block a user