fix: tcp read block

This commit is contained in:
langhuihui
2025-06-04 10:05:23 +08:00
parent dbf820b845
commit cf218215ff
19 changed files with 122 additions and 95 deletions

3
api.go
View File

@@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
@@ -718,7 +717,7 @@ func (s *Server) GetConfigFile(_ context.Context, req *emptypb.Empty) (res *pb.G
func (s *Server) UpdateConfigFile(_ context.Context, req *pb.UpdateConfigFileRequest) (res *pb.SuccessResponse, err error) {
if s.configFileContent != nil {
s.configFileContent = []byte(req.Content)
os.WriteFile(filepath.Join(ExecDir, s.conf.(string)), s.configFileContent, 0644)
os.WriteFile(s.configFilePath, s.configFileContent, 0644)
res = &pb.SuccessResponse{}
} else {
err = pkg.ErrNotFound

View File

@@ -56,6 +56,7 @@ func (mt *Job) Blocked() ITask {
}
func (mt *Job) waitChildrenDispose() {
blocked := mt.blocked
defer func() {
// 忽略由于在任务关闭过程中可能存在竞态条件,当父任务关闭时子任务可能已经被释放。
if err := recover(); err != nil {
@@ -64,7 +65,7 @@ func (mt *Job) waitChildrenDispose() {
mt.addSub <- nil
<-mt.childrenDisposed
}()
if blocked := mt.blocked; blocked != nil {
if blocked != nil {
blocked.Stop(mt.StopReason())
}
}
@@ -181,10 +182,8 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
return
}
if len(mt.addSub) > 10 {
if mt.Logger != nil {
mt.Warn("task wait list too many", "count", len(mt.addSub), "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType(), "parent", mt.GetOwnerType())
}
}
mt.addSub <- t
return
}
@@ -206,9 +205,7 @@ func (mt *Job) run() {
defer func() {
err := recover()
if err != nil {
if mt.Logger != nil {
mt.Logger.Error("job panic", "err", err, "stack", string(debug.Stack()))
}
mt.Error("job panic", "err", err, "stack", string(debug.Stack()))
if !ThrowPanic {
mt.Stop(errors.Join(err.(error), ErrPanic))
} else {
@@ -227,6 +224,7 @@ func (mt *Job) run() {
mt.blocked = nil
if chosen, rev, ok := reflect.Select(mt.cases); chosen == 0 {
if rev.IsNil() {
mt.Debug("job addSub channel closed, exiting", "taskId", mt.GetTaskID())
return
}
if mt.blocked = rev.Interface().(ITask); mt.blocked.start() {

View File

@@ -24,15 +24,11 @@ func (m *Manager[K, T]) Add(ctx T, opt ...any) *Task {
ctx.Stop(ErrExist)
return
}
if m.Logger != nil {
m.Logger.Debug("add", "key", ctx.GetKey(), "count", m.Length)
}
m.Debug("add", "key", ctx.GetKey(), "count", m.Length)
})
ctx.OnDispose(func() {
m.Remove(ctx)
if m.Logger != nil {
m.Logger.Debug("remove", "key", ctx.GetKey(), "count", m.Length)
}
m.Debug("remove", "key", ctx.GetKey(), "count", m.Length)
})
return m.AddTask(ctx, opt...)
}

View File

@@ -7,6 +7,7 @@ import (
"log/slog"
"maps"
"reflect"
"runtime"
"runtime/debug"
"strings"
"sync"
@@ -117,7 +118,7 @@ type (
ID uint32
StartTime time.Time
StartReason string
*slog.Logger
Logger *slog.Logger
context.Context
context.CancelCauseFunc
handler ITask
@@ -198,7 +199,11 @@ func (task *Task) WaitStopped() (err error) {
}
func (task *Task) Trace(msg string, fields ...any) {
task.Log(task.Context, TraceLevel, msg, fields...)
if task.Logger == nil {
slog.Default().Log(task.Context, TraceLevel, msg, fields...)
return
}
task.Logger.Log(task.Context, TraceLevel, msg, fields...)
}
func (task *Task) IsStopped() bool {
@@ -225,8 +230,9 @@ func (task *Task) Stop(err error) {
panic("task stop with nil error")
}
if task.CancelCauseFunc != nil {
if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL {
task.Debug("task stop", "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType())
if tt := task.handler.GetTaskType(); tt != TASK_TYPE_CALL {
_, file, line, _ := runtime.Caller(1)
task.Debug("task stop", "caller", fmt.Sprintf("%s:%d", strings.TrimPrefix(file, sourceFilePathPrefix), line), "reason", err, "elapsed", time.Since(task.StartTime), "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType())
}
task.CancelCauseFunc(err)
}
@@ -264,22 +270,18 @@ func (task *Task) checkRetry(err error) bool {
if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry {
task.retry.RetryCount++
task.SetDescription("retryCount", task.retry.RetryCount)
if task.Logger != nil {
if task.retry.MaxRetry < 0 {
task.Warn(fmt.Sprintf("retry %d/∞", task.retry.RetryCount), "taskId", task.ID)
} else {
task.Warn(fmt.Sprintf("retry %d/%d", task.retry.RetryCount, task.retry.MaxRetry), "taskId", task.ID)
}
}
if delta := time.Since(task.StartTime); delta < task.retry.RetryInterval {
time.Sleep(task.retry.RetryInterval - delta)
}
return true
} else {
if task.retry.MaxRetry > 0 {
if task.Logger != nil {
task.Warn(fmt.Sprintf("max retry %d failed", task.retry.MaxRetry))
}
return false
}
}
@@ -292,15 +294,13 @@ func (task *Task) start() bool {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
if task.Logger != nil {
task.Error("panic", "error", err, "stack", string(debug.Stack()))
}
}
}()
}
for {
task.StartTime = time.Now()
if tt := task.handler.GetTaskType(); task.Logger != nil && tt != TASK_TYPE_CALL {
if tt := task.handler.GetTaskType(); tt != TASK_TYPE_CALL {
task.Debug("task start", "taskId", task.ID, "taskType", tt, "ownerType", task.GetOwnerType(), "reason", task.StartReason)
}
task.state = TASK_STATE_STARTING
@@ -322,9 +322,7 @@ func (task *Task) start() bool {
task.ResetRetryCount()
if runHandler, ok := task.handler.(TaskBlock); ok {
task.state = TASK_STATE_RUNNING
if task.Logger != nil {
task.Debug("task run", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
}
err = runHandler.Run()
if err == nil {
err = ErrTaskComplete
@@ -335,9 +333,7 @@ func (task *Task) start() bool {
if err == nil {
if goHandler, ok := task.handler.(TaskGo); ok {
task.state = TASK_STATE_GOING
if task.Logger != nil {
task.Debug("task go", "taskId", task.ID, "taskType", task.GetTaskType(), "ownerType", task.GetOwnerType())
}
go task.run(goHandler.Go)
}
return true
@@ -384,20 +380,18 @@ func (task *Task) SetDescriptions(value Description) {
func (task *Task) dispose() {
taskType, ownerType := task.handler.GetTaskType(), task.GetOwnerType()
if task.state < TASK_STATE_STARTED {
if task.Logger != nil && taskType != TASK_TYPE_CALL {
if taskType != TASK_TYPE_CALL {
task.Debug("task dispose canceled", "taskId", task.ID, "taskType", taskType, "ownerType", ownerType, "state", task.state)
}
return
}
reason := task.StopReason()
task.state = TASK_STATE_DISPOSING
if task.Logger != nil {
if taskType != TASK_TYPE_CALL {
yargs := []any{"reason", reason, "taskId", task.ID, "taskType", taskType, "ownerType", ownerType}
task.Debug("task dispose", yargs...)
defer task.Debug("task disposed", yargs...)
}
}
befores := len(task.beforeDisposeListeners)
for i, listener := range task.beforeDisposeListeners {
task.SetDescription("disposeProcess", fmt.Sprintf("b:%d/%d", i, befores))
@@ -441,11 +435,9 @@ func (task *Task) run(handler func() error) {
if !ThrowPanic {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
if task.Logger != nil {
task.Error("panic", "error", err, "stack", string(debug.Stack()))
}
}
}
if err == nil {
task.Stop(ErrTaskComplete)
} else {
@@ -454,3 +446,39 @@ func (task *Task) run(handler func() error) {
}()
err = handler()
}
func (task *Task) Debug(msg string, args ...any) {
if task.Logger == nil {
slog.Default().Debug(msg, args...)
return
}
task.Logger.Debug(msg, args...)
}
func (task *Task) Info(msg string, args ...any) {
if task.Logger == nil {
slog.Default().Info(msg, args...)
return
}
task.Logger.Info(msg, args...)
}
func (task *Task) Warn(msg string, args ...any) {
if task.Logger == nil {
slog.Default().Warn(msg, args...)
return
}
task.Logger.Warn(msg, args...)
}
func (task *Task) Error(msg string, args ...any) {
if task.Logger == nil {
slog.Default().Error(msg, args...)
return
}
task.Logger.Error(msg, args...)
}
func (task *Task) TraceEnabled() bool {
return task.Logger.Enabled(task.Context, TraceLevel)
}

View File

@@ -436,7 +436,7 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque
if err := gb.DB.Where("id = ?", req.DeviceId).First(&device).Error; err == nil {
d = &device
// 恢复设备的必要字段
d.Logger = gb.With("id", req.DeviceId)
d.Logger = gb.Logger.With("id", req.DeviceId)
d.channels.L = new(sync.RWMutex)
d.plugin = gb

View File

@@ -220,7 +220,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
device.eventChan = make(chan any, 10)
// 设置Logger
device.Logger = gb.With("deviceid", device.DeviceId)
device.Logger = gb.Logger.With("deviceid", device.DeviceId)
// 初始化通道集合
device.channels.L = new(sync.RWMutex)

View File

@@ -22,9 +22,7 @@ package gb28181
import (
"fmt"
"log/slog"
"net"
"os"
"strconv"
"strings"
"sync"
@@ -61,7 +59,6 @@ type RTPForwarder struct {
SendInterval time.Duration // 发送间隔,可用于限流
lastSendTime time.Time // 上次发送时间
stopChan chan struct{} // 停止信号通道
*slog.Logger
StreamMode string // 数据流传输模式UDP:udp传输/TCP-ACTIVEtcp主动模式/TCP-PASSIVEtcp被动模式
}
@@ -71,7 +68,6 @@ func NewRTPForwarder() *RTPForwarder {
FeedChan: make(chan []byte, 2000), // 增加缓冲区大小,减少丢包风险
SendInterval: time.Millisecond * 0, // 默认不限制发送间隔,最大速度转发
stopChan: make(chan struct{}),
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
}
ret.bufferPool = sync.Pool{
@@ -90,7 +86,7 @@ func (p *RTPForwarder) ReadRTP(rtpBuf util.Buffer) (err error) {
return
}
if p.Enabled(p, task.TraceLevel) {
if p.TraceEnabled() {
p.Trace("rtp", "len", rtpBuf.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
}
@@ -347,7 +343,7 @@ func (p *RTPForwarder) Demux() {
}
p.lastSendTime = time.Now()
if p.Enabled(p, task.TraceLevel) && p.ForwardCount%1000 == 0 {
if p.TraceEnabled() && p.ForwardCount%1000 == 0 {
p.Trace("forward rtp packet", "count", p.ForwardCount, "TCP", p.TCP, "TCPActive", p.TCPActive)
}
}

View File

@@ -148,7 +148,7 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
return
}
if lastSeq == 0 || p.SequenceNumber == lastSeq+1 {
if p.Enabled(p, task.TraceLevel) {
if p.TraceEnabled() {
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.SSRC)
}
copyData := make([]byte, len(p.Payload))

View File

@@ -3,6 +3,12 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
myip "github.com/husanpao/ip"
@@ -12,11 +18,6 @@ import (
"m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
"net"
"os"
"strconv"
"sync"
"time"
)
type DeviceRegisterQueueTask struct {
@@ -419,7 +420,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
d.plugin = task.gb
d.LocalPort = myPort
d.Logger = task.gb.With("deviceid", deviceid)
d.Logger = task.gb.Logger.With("deviceid", deviceid)
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
d.channels.L = new(sync.RWMutex)

View File

@@ -40,7 +40,7 @@ type RTMPServer struct {
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &RTMPServer{conf: p}
ret.Init(conn)
ret.Logger = p.With("remote", conn.RemoteAddr().String())
ret.Logger = p.Logger.With("remote", conn.RemoteAddr().String())
return ret
}

View File

@@ -5,6 +5,7 @@ import (
"net"
"runtime"
"sync/atomic"
"time"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
@@ -128,6 +129,7 @@ func (nc *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) error
// }
func (nc *NetConnection) readChunk() (msg *Chunk, err error) {
nc.SetReadDeadline(time.Now().Add(time.Second * 5)) // 设置读取超时时间为5秒
head, err := nc.ReadByte()
if err != nil {
return nil, err
@@ -313,6 +315,9 @@ func (nc *NetConnection) RecvMessage() (msg *Chunk, err error) {
}
}
}
if nc.IsStopped() {
err = nc.StopReason()
}
}
return
}
@@ -344,6 +349,7 @@ func (nc *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
if sid, ok := msg.(HaveStreamID); ok {
head.MessageStreamID = sid.GetStreamID()
}
nc.SetWriteDeadline(time.Now().Add(time.Second * 5)) // 设置写入超时时间为5秒
return nc.sendChunk(net.Buffers{nc.tmpBuf}, head, RTMP_CHUNK_HEAD_12)
}

View File

@@ -154,17 +154,17 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
err = parseSequence()
return
case PacketTypeCodedFrames:
switch ctx := t.ICodecCtx.(type) {
switch t.ICodecCtx.(type) {
case *H265Ctx:
if avcc.CTS, err = reader.ReadBE(3); err != nil {
return err
}
avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
// avcc.filterH265(int(ctx.RecordInfo.LengthSizeMinusOne) + 1)
case *AV1Ctx:
// return avcc.parseAV1(reader)
}
case PacketTypeCodedFramesX:
avcc.filterH265(int(t.ICodecCtx.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1)
// avcc.filterH265(int(t.ICodecCtx.(*H265Ctx).RecordInfo.LengthSizeMinusOne) + 1)
}
} else {
b0, err = reader.ReadByte() //sequence frame flag

View File

@@ -29,7 +29,7 @@ type RTSPPlugin struct {
func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &RTSPServer{NetConnection: NewNetConnection(conn), conf: p}
ret.Logger = p.With("remote", conn.RemoteAddr().String())
ret.Logger = p.Logger.With("remote", conn.RemoteAddr().String())
return ret
}

View File

@@ -46,7 +46,7 @@ func (task *RTSPServer) Go() (err error) {
if task.URL == nil {
task.URL = req.URL
task.Logger = task.With("url", task.URL.String())
task.Logger = task.Logger.With("url", task.URL.String())
task.UserAgent = req.Header.Get("User-Agent")
task.Info("connect", "userAgent", task.UserAgent)
}

View File

@@ -3,6 +3,7 @@ package plugin_stress
import (
"context"
"fmt"
"slices"
"strings"
"github.com/mcuadros/go-defaults"
@@ -37,15 +38,18 @@ func (r *StressPlugin) pull(count int, url string, testMode int32, puller m7s.Pu
if err = ctx.WaitStarted(); err != nil {
return
}
r.pullers.AddUnique(ctx)
if r.pullers.AddUnique(ctx) {
ctx.OnDispose(func() {
r.pullers.Remove(ctx)
})
} else {
ctx.Stop(task.ErrExist)
}
}
} else if count < i {
clone := slices.Clone(r.pullers.Items)
for j := i; j > count; j-- {
r.pullers.Items[j-1].Stop(task.ErrStopByUser)
r.pullers.Remove(r.pullers.Items[j-1])
clone[j-1].Stop(task.ErrStopByUser)
}
}
return
@@ -61,15 +65,18 @@ func (r *StressPlugin) push(count int, streamPath, url string, pusher m7s.Pusher
if err = ctx.WaitStarted(); err != nil {
return
}
r.pushers.AddUnique(ctx)
if r.pushers.AddUnique(ctx) {
ctx.OnDispose(func() {
r.pushers.Remove(ctx)
})
} else {
ctx.Stop(task.ErrExist)
}
}
} else if count < i {
clone := slices.Clone(r.pushers.Items)
for j := i; j > count; j-- {
r.pushers.Items[j-1].Stop(task.ErrStopByUser)
r.pushers.Remove(r.pushers.Items[j-1])
clone[j-1].Stop(task.ErrStopByUser)
}
}
return
@@ -110,18 +117,16 @@ func (r *StressPlugin) StartPull(ctx context.Context, req *pb.PullRequest) (res
}
func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) {
for pusher := range r.pushers.Range {
for _, pusher := range slices.Clone(r.pushers.Items) {
pusher.Stop(task.ErrStopByUser)
}
r.pushers.Clear()
return &gpb.SuccessResponse{}, nil
}
func (r *StressPlugin) StopPull(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) {
for puller := range r.pullers.Range {
for _, puller := range slices.Clone(r.pullers.Items) {
puller.Stop(task.ErrStopByUser)
}
r.pullers.Clear()
return &gpb.SuccessResponse{}, nil
}

View File

@@ -1,8 +1,6 @@
package plugin_stress
import (
"sync"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/stress/pb"
@@ -18,7 +16,5 @@ type StressPlugin struct {
var _ = m7s.InstallPlugin[StressPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)
func (r *StressPlugin) OnInit() error {
r.pushers.L = &sync.RWMutex{}
r.pullers.L = &sync.RWMutex{}
return nil
}

View File

@@ -298,7 +298,7 @@ func (p *Publisher) fixTimestamp(t *AVTrack, data IAVFrame) {
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
t.AcceptFrame(data)
if p.Enabled(p, task.TraceLevel) {
if p.TraceEnabled() {
frame := &t.Value
codec := t.FourCC().String()
p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(data.GetTimestamp()/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", data.GetSize(), "data", data.String())

View File

@@ -107,6 +107,7 @@ type (
lastSummaryTime time.Time
lastSummary *pb.SummaryResponse
conf any
configFilePath string
configFileContent []byte
disabledPlugins []*Plugin
prometheusDesc prometheusDesc
@@ -237,6 +238,7 @@ func (s *Server) Start() (err error) {
if _, err = os.Stat(v); err != nil {
v = filepath.Join(ExecDir, v)
}
s.configFilePath = v
if configYaml, err = os.ReadFile(v); err != nil {
s.Warn("read config file failed", "error", err.Error())
} else {

View File

@@ -299,7 +299,7 @@ func (handler *SubscribeHandler[A, V]) sendAudioFrame() (err error) {
} else if handler.awi > 0 && len(handler.audioFrame.Wraps) > handler.awi-1 {
frame := handler.audioFrame.Wraps[handler.awi-1]
frameSize := frame.GetSize()
if handler.s.Enabled(handler.s, task.TraceLevel) {
if handler.s.TraceEnabled() {
handler.s.Trace("send audio frame", "seq", handler.audioFrame.Sequence, "data", frame.String(), "size", frameSize)
}
if audioFrame, ok := frame.(A); ok {
@@ -344,7 +344,7 @@ func (handler *SubscribeHandler[A, V]) sendVideoFrame() (err error) {
if handler.vwi > 0 && len(handler.videoFrame.Wraps) > handler.vwi-1 {
frame := handler.videoFrame.Wraps[handler.vwi-1]
frameSize := frame.GetSize()
if handler.s.Enabled(handler.s, task.TraceLevel) {
if handler.s.TraceEnabled() {
handler.s.Trace("send video frame", "seq", handler.videoFrame.Sequence, "data", frame.String(), "size", frameSize)
}
if videoFrame, ok := frame.(V); ok {