fix: writeTrailerTask never dispose

This commit is contained in:
langhuihui
2024-11-10 15:58:41 +08:00
parent e0f4d79d75
commit 70a34010c9
10 changed files with 860 additions and 763 deletions

14
api.go
View File

@@ -160,9 +160,9 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
} }
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) { func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
var fillData func(m task.ITask) *pb.TaskTreeResponse var fillData func(m task.ITask) *pb.TaskTreeData
fillData = func(m task.ITask) (res *pb.TaskTreeResponse) { fillData = func(m task.ITask) (res *pb.TaskTreeData) {
res = &pb.TaskTreeResponse{Id: m.GetTaskID(), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: m.GetDescriptions()} res = &pb.TaskTreeData{Id: m.GetTaskID(), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: m.GetDescriptions()}
if job, ok := m.(task.IJob); ok { if job, ok := m.(task.IJob); ok {
if blockedTask := job.Blocked(); blockedTask != nil { if blockedTask := job.Blocked(); blockedTask != nil {
res.Blocked = fillData(blockedTask) res.Blocked = fillData(blockedTask)
@@ -173,7 +173,7 @@ func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResp
} }
return return
} }
res = fillData(&Servers) res = &pb.TaskTreeResponse{Data: fillData(&Servers)}
return return
} }
@@ -361,7 +361,7 @@ func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *empty
func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) { func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) {
if s, ok := Servers.Get(req.Id); ok { if s, ok := Servers.Get(req.Id); ok {
s.Stop(pkg.ErrStopFromAPI) s.Stop(task.ErrStopByUser)
} else { } else {
return nil, pkg.ErrNotFound return nil, pkg.ErrNotFound
} }
@@ -439,7 +439,7 @@ func (s *Server) SeekStream(ctx context.Context, req *pb.SeekStreamRequest) (res
func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) {
s.Streams.Call(func() error { s.Streams.Call(func() error {
if s, ok := s.Streams.Get(req.StreamPath); ok { if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Stop(pkg.ErrStopFromAPI) s.Stop(task.ErrStopByUser)
} }
return nil return nil
}) })
@@ -711,7 +711,7 @@ func (s *Server) RemoveDevice(ctx context.Context, req *pb.RequestWithId) (res *
err = tx.Error err = tx.Error
s.Devices.Call(func() error { s.Devices.Call(func() error {
if device, ok := s.Devices.Get(uint(req.Id)); ok { if device, ok := s.Devices.Get(uint(req.Id)); ok {
device.Stop(pkg.ErrStopFromAPI) device.Stop(task.ErrStopByUser)
} }
return nil return nil
}) })

View File

@@ -7,8 +7,6 @@ import (
"strings" "strings"
"time" "time"
"m7s.live/pro/pkg"
"gorm.io/gorm" "gorm.io/gorm"
"m7s.live/pro/pkg/config" "m7s.live/pro/pkg/config"
"m7s.live/pro/pkg/task" "m7s.live/pro/pkg/task"
@@ -158,7 +156,7 @@ func (d *DeviceTask) Dispose() {
d.TickTask.Dispose() d.TickTask.Dispose()
d.Plugin.Server.Streams.Call(func() error { d.Plugin.Server.Streams.Call(func() error {
if stream, ok := d.Plugin.Server.Streams.Get(d.Device.GetStreamPath()); ok { if stream, ok := d.Plugin.Server.Streams.Get(d.Device.GetStreamPath()); ok {
stream.Stop(pkg.ErrStopFromAPI) stream.Stop(task.ErrStopByUser)
} }
return nil return nil
}) })

File diff suppressed because it is too large Load Diff

View File

@@ -231,15 +231,21 @@ message SysInfoResponse {
SysInfoData data = 3; SysInfoData data = 3;
} }
message TaskTreeResponse { message TaskTreeData {
uint32 id = 1; uint32 id = 1;
uint32 type = 2; uint32 type = 2;
string owner = 3; string owner = 3;
google.protobuf.Timestamp startTime = 4; google.protobuf.Timestamp startTime = 4;
map<string, string> description = 5; map<string, string> description = 5;
repeated TaskTreeResponse children = 6; repeated TaskTreeData children = 6;
uint32 state = 7; uint32 state = 7;
TaskTreeResponse blocked = 8; TaskTreeData blocked = 8;
}
message TaskTreeResponse {
int32 code = 1;
string message = 2;
TaskTreeData data = 3;
} }
message StreamListRequest { message StreamListRequest {

View File

@@ -4,7 +4,6 @@ import "errors"
var ( var (
ErrNotFound = errors.New("not found") ErrNotFound = errors.New("not found")
ErrStopFromAPI = errors.New("stop from api")
ErrStreamExist = errors.New("stream exist") ErrStreamExist = errors.New("stream exist")
ErrKick = errors.New("kick") ErrKick = errors.New("kick")
ErrDiscard = errors.New("discard") ErrDiscard = errors.New("discard")

View File

@@ -7,7 +7,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"m7s.live/pro" m7s "m7s.live/pro"
"m7s.live/pro/pkg" "m7s.live/pro/pkg"
"m7s.live/pro/pkg/codec" "m7s.live/pro/pkg/codec"
"m7s.live/pro/pkg/task" "m7s.live/pro/pkg/task"
@@ -30,8 +30,14 @@ func (task *writeTrailerTask) Start() (err error) {
err = task.muxer.WriteTrailer() err = task.muxer.WriteTrailer()
if err != nil { if err != nil {
task.Error("write trailer", "err", err) task.Error("write trailer", "err", err)
return task.muxer.File.Close() if errClose := task.muxer.File.Close(); errClose != nil {
} else { return errClose
}
}
return
}
func (task *writeTrailerTask) Run() (err error) {
task.Info("write trailer") task.Info("write trailer")
var temp *os.File var temp *os.File
temp, err = os.CreateTemp("", "*.mp4") temp, err = os.CreateTemp("", "*.mp4")
@@ -66,7 +72,6 @@ func (task *writeTrailerTask) Start() (err error) {
} }
return return
} }
}
func init() { func init() {
m7s.Servers.AddTask(&writeTrailerQueueTask) m7s.Servers.AddTask(&writeTrailerQueueTask)

View File

@@ -250,6 +250,9 @@ func (c *NetConnection) ReadResponse() (res *util.Response, err error) {
func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) error, onRTCP func(byte, []byte) error) (err error) { func (c *NetConnection) Receive(sendMode bool, onReceive func(byte, []byte) error, onRTCP func(byte, []byte) error) (err error) {
for err == nil { for err == nil {
if err = c.StopReason(); err != nil {
return
}
ts := time.Now() ts := time.Now()
if err = c.conn.SetReadDeadline(ts.Add(util.Conditional(sendMode, time.Second*60, time.Second*15))); err != nil { if err = c.conn.SetReadDeadline(ts.Add(util.Conditional(sendMode, time.Second*60, time.Second*15))); err != nil {
return return

View File

@@ -4,11 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"m7s.live/pro/pkg/config" "m7s.live/pro/pkg/config"
"m7s.live/pro/pkg/task"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"m7s.live/pro" "m7s.live/pro"
gpb "m7s.live/pro/pb" gpb "m7s.live/pro/pb"
"m7s.live/pro/pkg"
hdl "m7s.live/pro/plugin/flv/pkg" hdl "m7s.live/pro/plugin/flv/pkg"
rtmp "m7s.live/pro/plugin/rtmp/pkg" rtmp "m7s.live/pro/plugin/rtmp/pkg"
rtsp "m7s.live/pro/plugin/rtsp/pkg" rtsp "m7s.live/pro/plugin/rtsp/pkg"
@@ -31,7 +31,7 @@ func (r *StressPlugin) pull(count int, format, url string, puller m7s.Puller) (e
} }
} else if count < i { } else if count < i {
for j := i; j > count; j-- { for j := i; j > count; j-- {
r.pullers.Items[j-1].Stop(pkg.ErrStopFromAPI) r.pullers.Items[j-1].Stop(task.ErrStopByUser)
r.pullers.Remove(r.pullers.Items[j-1]) r.pullers.Remove(r.pullers.Items[j-1])
} }
} }
@@ -53,7 +53,7 @@ func (r *StressPlugin) push(count int, streamPath, format, remoteHost string, pu
} }
} else if count < i { } else if count < i {
for j := i; j > count; j-- { for j := i; j > count; j-- {
r.pushers.Items[j-1].Stop(pkg.ErrStopFromAPI) r.pushers.Items[j-1].Stop(task.ErrStopByUser)
r.pushers.Remove(r.pushers.Items[j-1]) r.pushers.Remove(r.pushers.Items[j-1])
} }
} }
@@ -82,7 +82,7 @@ func (r *StressPlugin) PullHDL(ctx context.Context, req *pb.PullRequest) (res *g
func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) { func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) {
for pusher := range r.pushers.Range { for pusher := range r.pushers.Range {
pusher.Stop(pkg.ErrStopFromAPI) pusher.Stop(task.ErrStopByUser)
} }
r.pushers.Clear() r.pushers.Clear()
return &gpb.SuccessResponse{}, nil return &gpb.SuccessResponse{}, nil
@@ -90,7 +90,7 @@ func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *g
func (r *StressPlugin) StopPull(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) { func (r *StressPlugin) StopPull(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) {
for puller := range r.pullers.Range { for puller := range r.pullers.Range {
puller.Stop(pkg.ErrStopFromAPI) puller.Stop(task.ErrStopByUser)
} }
r.pullers.Clear() r.pullers.Clear()
return &gpb.SuccessResponse{}, nil return &gpb.SuccessResponse{}, nil

View File

@@ -128,8 +128,10 @@ func (p *PullJob) Publish() (err error) {
p.Publisher.Type = PublishTypePull p.Publisher.Type = PublishTypePull
if err == nil && p.conf.MaxRetry != 0 { if err == nil && p.conf.MaxRetry != 0 {
p.Publisher.OnDispose(func() { p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, pkg.ErrStopFromAPI) { if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) {
p.Stop(p.Publisher.StopReason()) p.Stop(p.Publisher.StopReason())
} else {
p.puller.Stop(p.Publisher.StopReason())
} }
}) })
} }

View File

@@ -80,8 +80,10 @@ func (p *TransformJob) Publish(streamPath string) (err error) {
p.Publisher.Type = PublishTypeTransform p.Publisher.Type = PublishTypeTransform
if err == nil { if err == nil {
p.Publisher.OnDispose(func() { p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, pkg.ErrStopFromAPI) { if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) {
p.Stop(p.Publisher.StopReason()) p.Stop(p.Publisher.StopReason())
} else {
p.Transformer.Stop(p.Publisher.StopReason())
} }
}) })
} }