From 69ff04acb0f6904563132cd3cba49d1b552be80e Mon Sep 17 00:00:00 2001 From: pggiroro Date: Tue, 9 Sep 2025 11:08:00 +0800 Subject: [PATCH] fix: sip support tcp --- plugin/gb28181/api.go | 72 +++++++------------------------ plugin/gb28181/client.go | 13 ++++-- plugin/gb28181/device.go | 1 + plugin/gb28181/dialog.go | 21 ++++----- plugin/gb28181/index.go | 20 +++++++-- plugin/gb28181/registerhandler.go | 21 +++++++-- plugin/rtp/pkg/transceiver.go | 1 + 7 files changed, 72 insertions(+), 77 deletions(-) diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index 29d8286..b4a8600 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net/url" "os" "sort" "strings" - "sync" "time" "github.com/emiago/sipgo" @@ -16,7 +16,6 @@ import ( "gorm.io/gorm" "m7s.live/v5/pkg/util" - "github.com/rs/zerolog" "google.golang.org/protobuf/types/known/timestamppb" "m7s.live/v5/plugin/gb28181/pb" gb28181 "m7s.live/v5/plugin/gb28181/pkg" @@ -393,64 +392,16 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque Code: 404, Message: "device not found", } + var device *Device // 先从内存中获取设备 - d, ok := gb.devices.Get(req.DeviceId) - if !ok && gb.DB != nil { - // 如果内存中没有且数据库存在,则从数据库查询 - var device Device - if err := gb.DB.Where("device_id = ?", req.DeviceId).First(&device).Error; err == nil { - d = &device - // 恢复设备的必要字段 - d.Logger = gb.Logger.With("deviceid", req.DeviceId) - d.channels.L = new(sync.RWMutex) - d.plugin = gb - - // 初始化 Task - var hash uint32 - for i := 0; i < len(d.DeviceId); i++ { - ch := d.DeviceId[i] - hash = hash*31 + uint32(ch) - } - d.Task.ID = hash - d.Task.Logger = d.Logger - d.Task.Context, d.Task.CancelCauseFunc = context.WithCancelCause(context.Background()) - - // 初始化 SIP 相关字段 - d.fromHDR = sip.FromHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: gb.Realm, - }, - Params: sip.NewParams(), - } - d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) - - d.contactHDR = sip.ContactHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: d.SipIp, - Port: d.Port, - }, - } - - d.Recipient = sip.Uri{ - Host: d.IP, - Port: d.Port, - User: d.DeviceId, - } - - // 初始化 SIP 客户端 - d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp)) - - // 将设备添加到内存中 - gb.devices.AddTask(d) - } + if d, ok := gb.devices.Get(req.DeviceId); ok { + device = d } - if d != nil { + if device != nil { // 发送目录查询请求 - _, err := d.catalog() + _, err := device.catalog() if err != nil { resp.Code = 500 resp.Message = "catalog request failed" @@ -458,7 +409,7 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque } else { resp.Code = 0 resp.Message = "sync request sent" - resp.Total = int32(d.ChannelCount) + resp.Total = int32(device.ChannelCount) resp.Current = 0 // 初始化进度为0 } } @@ -1284,7 +1235,14 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (* // Request-URI: sip:34020000001320000006@192.168.1.102:5060 // [Resent Packet: False] // 初始化SIP客户端 - device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname("192.168.1.106")) + opts := &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + } + logHandler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(logHandler) + slog.SetDefault(logger) // 设置为默认日志记录器 + device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname("192.168.1.106")) if device.client == nil { resp.Code = 500 resp.Message = "failed to create sip client" diff --git a/plugin/gb28181/client.go b/plugin/gb28181/client.go index 360ca9a..7c43514 100644 --- a/plugin/gb28181/client.go +++ b/plugin/gb28181/client.go @@ -3,6 +3,7 @@ package plugin_gb28181pro import ( "context" "fmt" + "log/slog" "os" "strings" "time" @@ -11,7 +12,6 @@ import ( "github.com/emiago/sipgo/sip" myip "github.com/husanpao/ip" "github.com/icholy/digest" - "github.com/rs/zerolog" "m7s.live/v5/pkg/task" gb28181 "m7s.live/v5/plugin/gb28181/pkg" ) @@ -52,7 +52,14 @@ func (c *Client) Start() (err error) { // Check if host is private/internal network IP //if util.IsPrivateIP(c.recipient.Host) { - c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(myip.InternalIPv4()), sipgo.WithClientPort(5061)) + opts := &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + } + logHandler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(logHandler) + slog.SetDefault(logger) // 设置为默认日志记录器 + c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(myip.InternalIPv4()), sipgo.WithClientPort(5061)) //} else { // c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(util.Routes[myip.InternalIPv4()]), sipgo.WithClientPort(5061)) //} @@ -60,7 +67,7 @@ func (c *Client) Start() (err error) { if err != nil { return } - c.srv, _ = sipgo.NewServer(c.conf.ua, sipgo.WithServerLogger(zerolog.New(os.Stdout))) + c.srv, _ = sipgo.NewServer(c.conf.ua, sipgo.WithServerLogger(logger)) contactHDR := sip.ContactHeader{ Address: sip.Uri{ User: c.conf.Serial, diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 9906ee5..ca36706 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -467,6 +467,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 func (d *Device) send(req *sip.Request) (*sip.Response, error) { d.SN++ d.Trace("send", "req", req.String()) + req.SetTransport(d.Transport) return d.client.Do(context.Background(), req) } diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index e22881b..aa4cc96 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -230,9 +230,10 @@ func (d *Dialog) Start() (err error) { "a=connection:new", ) case mrtp.StreamModeUDP: - /* 支持udp收流 yjx - return errors.New("do not support udp mode") - */ + sdpInfo = append(sdpInfo, + "a=setup:active", + "a=connection:new", + ) default: sdpInfo = append(sdpInfo, "a=setup:passive", @@ -265,7 +266,7 @@ func (d *Dialog) Start() (err error) { viaHeader := sip.ViaHeader{ ProtocolName: "SIP", ProtocolVersion: "2.0", - Transport: "UDP", + Transport: device.Transport, Host: device.MediaIp, Port: device.LocalPort, Params: sip.NewParams(), @@ -305,7 +306,11 @@ func (d *Dialog) Start() (err error) { //if runtime.GOOS == "windows" { // d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader) //} else { - d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader) + if strings.ToLower(device.Transport) == "tcp" { + d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader) + } else { + d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader) + } //} // 最后添加Content-Length头部 if err != nil { @@ -423,14 +428,10 @@ func (d *Dialog) Dispose() { } } d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId) - if d.session != nil { + if d.session != nil && d.session.InviteResponse != nil { err := d.session.Bye(d) if err != nil { d.Error("dialog bye bye err", err) } - err = d.session.Close() - if err != nil { - d.Error("dialog close session err", err) - } } } diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 23588b6..1300f55 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -3,6 +3,7 @@ package plugin_gb28181pro import ( "errors" "fmt" + "log/slog" "net" "net/http" "os" @@ -18,7 +19,6 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/pion/rtp" - "github.com/rs/zerolog" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" @@ -164,8 +164,13 @@ func (gb *GB28181Plugin) Start() (err error) { return pkg.ErrNoDB } gb.Info("GB28181 initing", gb.Platforms) - logger := zerolog.New(os.Stdout) - gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + // 设置 TCP 传输模式 + tcpOption := sip.WithTransportLayerConnectionReuse(true) // 启用连接重用 + gb.ua, err = sipgo.NewUA( + sipgo.WithUserAgent("M7S/"+m7s.Version), + sipgo.WithUserAgentTransportLayerOptions(tcpOption), // 使用 TCP 选项 + ) // Build user agent // Creating client handle for ua if len(gb.Sip.ListenAddr) > 0 { gb.AddTask(&catalogHandlerQueueTask) @@ -371,7 +376,14 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) { } // 创建SIP客户端 - device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp)) + opts := &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + } + logHandler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(logHandler) + slog.SetDefault(logger) // 设置为默认日志记录器 + device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(device.SipIp)) device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR) // 设置设备ID的hash值作为任务ID diff --git a/plugin/gb28181/registerhandler.go b/plugin/gb28181/registerhandler.go index 1ecc957..bcbc37a 100644 --- a/plugin/gb28181/registerhandler.go +++ b/plugin/gb28181/registerhandler.go @@ -3,6 +3,7 @@ package plugin_gb28181pro import ( "errors" "fmt" + "log/slog" "net" "os" "strconv" @@ -13,7 +14,6 @@ import ( "github.com/emiago/sipgo/sip" myip "github.com/husanpao/ip" "github.com/icholy/digest" - "github.com/rs/zerolog" "gorm.io/gorm" "m7s.live/v5" "m7s.live/v5/pkg/task" @@ -303,7 +303,15 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) { d.KeepaliveTime = time.Now() d.RegisterTime = time.Now() d.Online = true - d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp)) + d.Transport = req.Transport() + opts := &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + } + logHandler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(logHandler) + slog.SetDefault(logger) // 设置为默认日志记录器 + d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp)) d.channels.L = new(sync.RWMutex) d.catalogReqs.L = new(sync.RWMutex) d.plugin = task.gb @@ -432,7 +440,14 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request, d.Logger = task.gb.Logger.With("deviceid", deviceid) d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) - d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp)) + opts := &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + } + logHandler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(logHandler) + slog.SetDefault(logger) // 设置为默认日志记录器 + d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp)) d.channels.L = new(sync.RWMutex) d.catalogReqs.L = new(sync.RWMutex) d.Info("StoreDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "req.Recipient", req.Recipient, "myPort", myPort, "d.Recipient", d.Recipient) diff --git a/plugin/rtp/pkg/transceiver.go b/plugin/rtp/pkg/transceiver.go index 4aee7ad..1ead725 100644 --- a/plugin/rtp/pkg/transceiver.go +++ b/plugin/rtp/pkg/transceiver.go @@ -137,6 +137,7 @@ func (p *Receiver) Start() (err error) { if err != nil { return } + p.OnStop(conn.Close) rtpReader = NewRTPPayloadReader(NewRTPUDPReader(conn)) p.BufReader = util.NewBufReader(rtpReader) case StreamModeManual: