fix: sip support tcp

This commit is contained in:
pggiroro
2025-09-09 11:08:00 +08:00
committed by langhuihui
parent fce3dcbd3d
commit 69ff04acb0
7 changed files with 72 additions and 77 deletions

View File

@@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/url"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/emiago/sipgo"
@@ -16,7 +16,6 @@ import (
"gorm.io/gorm"
"m7s.live/v5/pkg/util"
"github.com/rs/zerolog"
"google.golang.org/protobuf/types/known/timestamppb"
"m7s.live/v5/plugin/gb28181/pb"
gb28181 "m7s.live/v5/plugin/gb28181/pkg"
@@ -393,64 +392,16 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque
Code: 404,
Message: "device not found",
}
var device *Device
// 先从内存中获取设备
d, ok := gb.devices.Get(req.DeviceId)
if !ok && gb.DB != nil {
// 如果内存中没有且数据库存在,则从数据库查询
var device Device
if err := gb.DB.Where("device_id = ?", req.DeviceId).First(&device).Error; err == nil {
d = &device
// 恢复设备的必要字段
d.Logger = gb.Logger.With("deviceid", req.DeviceId)
d.channels.L = new(sync.RWMutex)
d.plugin = gb
// 初始化 Task
var hash uint32
for i := 0; i < len(d.DeviceId); i++ {
ch := d.DeviceId[i]
hash = hash*31 + uint32(ch)
}
d.Task.ID = hash
d.Task.Logger = d.Logger
d.Task.Context, d.Task.CancelCauseFunc = context.WithCancelCause(context.Background())
// 初始化 SIP 相关字段
d.fromHDR = sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: gb.Realm,
},
Params: sip.NewParams(),
}
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
d.contactHDR = sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: d.SipIp,
Port: d.Port,
},
}
d.Recipient = sip.Uri{
Host: d.IP,
Port: d.Port,
User: d.DeviceId,
}
// 初始化 SIP 客户端
d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
// 将设备添加到内存中
gb.devices.AddTask(d)
}
if d, ok := gb.devices.Get(req.DeviceId); ok {
device = d
}
if d != nil {
if device != nil {
// 发送目录查询请求
_, err := d.catalog()
_, err := device.catalog()
if err != nil {
resp.Code = 500
resp.Message = "catalog request failed"
@@ -458,7 +409,7 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque
} else {
resp.Code = 0
resp.Message = "sync request sent"
resp.Total = int32(d.ChannelCount)
resp.Total = int32(device.ChannelCount)
resp.Current = 0 // 初始化进度为0
}
}
@@ -1284,7 +1235,14 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (*
// Request-URI: sip:34020000001320000006@192.168.1.102:5060
// [Resent Packet: False]
// 初始化SIP客户端
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname("192.168.1.106"))
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname("192.168.1.106"))
if device.client == nil {
resp.Code = 500
resp.Message = "failed to create sip client"

View File

@@ -3,6 +3,7 @@ package plugin_gb28181pro
import (
"context"
"fmt"
"log/slog"
"os"
"strings"
"time"
@@ -11,7 +12,6 @@ import (
"github.com/emiago/sipgo/sip"
myip "github.com/husanpao/ip"
"github.com/icholy/digest"
"github.com/rs/zerolog"
"m7s.live/v5/pkg/task"
gb28181 "m7s.live/v5/plugin/gb28181/pkg"
)
@@ -52,7 +52,14 @@ func (c *Client) Start() (err error) {
// Check if host is private/internal network IP
//if util.IsPrivateIP(c.recipient.Host) {
c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(myip.InternalIPv4()), sipgo.WithClientPort(5061))
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(myip.InternalIPv4()), sipgo.WithClientPort(5061))
//} else {
// c.Client, err = sipgo.NewClient(c.conf.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(util.Routes[myip.InternalIPv4()]), sipgo.WithClientPort(5061))
//}
@@ -60,7 +67,7 @@ func (c *Client) Start() (err error) {
if err != nil {
return
}
c.srv, _ = sipgo.NewServer(c.conf.ua, sipgo.WithServerLogger(zerolog.New(os.Stdout)))
c.srv, _ = sipgo.NewServer(c.conf.ua, sipgo.WithServerLogger(logger))
contactHDR := sip.ContactHeader{
Address: sip.Uri{
User: c.conf.Serial,

View File

@@ -467,6 +467,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
func (d *Device) send(req *sip.Request) (*sip.Response, error) {
d.SN++
d.Trace("send", "req", req.String())
req.SetTransport(d.Transport)
return d.client.Do(context.Background(), req)
}

View File

@@ -230,9 +230,10 @@ func (d *Dialog) Start() (err error) {
"a=connection:new",
)
case mrtp.StreamModeUDP:
/* 支持udp收流 yjx
return errors.New("do not support udp mode")
*/
sdpInfo = append(sdpInfo,
"a=setup:active",
"a=connection:new",
)
default:
sdpInfo = append(sdpInfo,
"a=setup:passive",
@@ -265,7 +266,7 @@ func (d *Dialog) Start() (err error) {
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: "UDP",
Transport: device.Transport,
Host: device.MediaIp,
Port: device.LocalPort,
Params: sip.NewParams(),
@@ -305,7 +306,11 @@ func (d *Dialog) Start() (err error) {
//if runtime.GOOS == "windows" {
// d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
//} else {
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
if strings.ToLower(device.Transport) == "tcp" {
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
} else {
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
}
//}
// 最后添加Content-Length头部
if err != nil {
@@ -423,14 +428,10 @@ func (d *Dialog) Dispose() {
}
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
if d.session != nil {
if d.session != nil && d.session.InviteResponse != nil {
err := d.session.Bye(d)
if err != nil {
d.Error("dialog bye bye err", err)
}
err = d.session.Close()
if err != nil {
d.Error("dialog close session err", err)
}
}
}

View File

@@ -3,6 +3,7 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
@@ -18,7 +19,6 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/pion/rtp"
"github.com/rs/zerolog"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
@@ -164,8 +164,13 @@ func (gb *GB28181Plugin) Start() (err error) {
return pkg.ErrNoDB
}
gb.Info("GB28181 initing", gb.Platforms)
logger := zerolog.New(os.Stdout)
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
// 设置 TCP 传输模式
tcpOption := sip.WithTransportLayerConnectionReuse(true) // 启用连接重用
gb.ua, err = sipgo.NewUA(
sipgo.WithUserAgent("M7S/"+m7s.Version),
sipgo.WithUserAgentTransportLayerOptions(tcpOption), // 使用 TCP 选项
) // Build user agent
// Creating client handle for ua
if len(gb.Sip.ListenAddr) > 0 {
gb.AddTask(&catalogHandlerQueueTask)
@@ -371,7 +376,14 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
}
// 创建SIP客户端
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp))
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(device.SipIp))
device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR)
// 设置设备ID的hash值作为任务ID

View File

@@ -3,6 +3,7 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"log/slog"
"net"
"os"
"strconv"
@@ -13,7 +14,6 @@ import (
"github.com/emiago/sipgo/sip"
myip "github.com/husanpao/ip"
"github.com/icholy/digest"
"github.com/rs/zerolog"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
@@ -303,7 +303,15 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
d.KeepaliveTime = time.Now()
d.RegisterTime = time.Now()
d.Online = true
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
d.Transport = req.Transport()
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp))
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.plugin = task.gb
@@ -432,7 +440,14 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
d.Logger = task.gb.Logger.With("deviceid", deviceid)
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp))
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.Info("StoreDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "req.Recipient", req.Recipient, "myPort", myPort, "d.Recipient", d.Recipient)

View File

@@ -137,6 +137,7 @@ func (p *Receiver) Start() (err error) {
if err != nil {
return
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPUDPReader(conn))
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeManual: