fix: sip client reuse,correct trasnport,decode xml

This commit is contained in:
pggiroro
2025-11-08 19:58:53 +08:00
parent bc6cad2529
commit 7e64183b05
9 changed files with 619 additions and 188 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/url"
"os"
"sort"
@@ -1150,6 +1149,7 @@ func (gb *GB28181Plugin) QueryRecord(ctx context.Context, req *pb.QueryRecordReq
resp := &pb.QueryRecordResponse{
Code: 0,
Message: "",
Data: []*pb.RecordItem{},
}
startTime, endTime, err := util.TimeRangeQueryParse(url.Values{"range": []string{req.Range}, "start": []string{req.Start}, "end": []string{req.End}})
// 获取设备和通道
@@ -1200,14 +1200,21 @@ func (gb *GB28181Plugin) QueryRecord(ctx context.Context, req *pb.QueryRecordReq
resp.DeviceId = req.DeviceId
resp.ChannelId = req.ChannelId
resp.Name = firstResponse.Name
resp.Count = int32(recordReq.ReceivedNum)
resp.SumNum = int32(recordReq.SumNum)
if !firstResponse.LastTime.IsZero() {
resp.LastTime = timestamppb.New(firstResponse.LastTime)
}
}
for _, record := range recordReq.Response {
if len(record.RecordList.Item) == 0 {
continue
}
for _, item := range record.RecordList.Item {
// 过滤无效的记录(所有字段都为空)
if item.DeviceID == "" && item.StartTime == "" {
continue
}
resp.Data = append(resp.Data, &pb.RecordItem{
DeviceId: item.DeviceID,
Name: item.Name,
@@ -1222,6 +1229,7 @@ func (gb *GB28181Plugin) QueryRecord(ctx context.Context, req *pb.QueryRecordReq
}
}
resp.Count = int32(recordReq.SumNum)
resp.Code = 0
resp.Message = fmt.Sprintf("success, received %d/%d records", recordReq.ReceivedNum, recordReq.SumNum)
@@ -1290,6 +1298,7 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (*
device := &Device{
DeviceId: "34020000002000000001",
SipIp: "192.168.1.106",
LocalPort: 5060,
Port: 5060,
IP: "192.168.1.102",
StreamMode: "TCP-PASSIVE",
@@ -1317,20 +1326,15 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (*
// Method: INVITE
// Request-URI: sip:34020000001320000006@192.168.1.102:5060
// [Resent Packet: False]
// 初始化SIP客户端
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname("192.168.1.106"))
if device.client == nil {
// 根据设备的SipIp、LocalPort和Transport获取或创建对应的Client
// 测试默认使用UDP
client, err := gb.getOrCreateClient(device.SipIp, device.LocalPort, "UDP")
if err != nil {
resp.Code = 500
resp.Message = "failed to create sip client"
resp.Message = fmt.Sprintf("创建Client失败: %v", err)
return resp, nil
}
device.client = client
// 构建目标URI
recipient := sip.Uri{
@@ -1379,16 +1383,7 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (*
},
}
userAgentHeader := sip.NewHeader("User-Agent", "WVP-Pro v2.7.3.20241218")
//Via: SIP/2.0/UDP 192.168.1.106:5060;branch=z9hG4bK9279674404;rport
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: "UDP",
Host: "192.168.1.106",
Port: 5060,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", "z9hG4bK9279674404").Add("rport", "")
// 不手动添加Via头部让Client自动创建
csqHeader := sip.CSeqHeader{
SeqNo: 3,
@@ -1400,7 +1395,6 @@ func (gb *GB28181Plugin) TestSip(ctx context.Context, req *pb.TestSipRequest) (*
request.AppendHeader(subjectHeader)
request.AppendHeader(&toHeader)
request.AppendHeader(userAgentHeader)
request.AppendHeader(&viaHeader)
// 设置消息体
request.SetBody([]byte(strings.Join(sdpInfo, "\r\n") + "\r\n"))

View File

@@ -233,8 +233,11 @@ func (c *catalogHandlerTask) Run() (err error) {
if c.CustomChannelId == "" {
c.CustomChannelId = c.ChannelId
}
if c.CustomName == "" {
c.CustomName = c.Name
}
// 使用 Save 进行 upsert 操作
d.Debug("ready to addOrUpdateChannel", "channel.ID is", c.ID, "channel.Status is", c.Status)
d.Debug("ready to addOrUpdateChannel", "channel.ID is", c.ID, "channel.Status is", c.Status, "channel.Name", c.Name, "channel.Owner", c.Owner, "channel.Address", c.Address)
d.addOrUpdateChannel(c)
catalogReq.TotalCount++
}
@@ -330,17 +333,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
}
request.AppendHeader(&toHeader)
// 添加Via头部
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: platform.PlatformModel.Transport,
Host: platform.PlatformModel.DeviceIP,
Port: platform.PlatformModel.DevicePort,
Params: sip.NewParams(),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16)).Add("rport", "")
request.AppendHeader(&viaHeader)
// 不手动添加Via头部让Client自动创建并由TransportLayer填充正确的IP
// 设置Content-Type
contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml")
@@ -445,9 +438,23 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
func (d *Device) send(req *sip.Request) (*sip.Response, error) {
d.SN++
d.Trace("send", "req", req.String())
// 检查Via头部和Transport
via := req.Via()
transportBefore := req.Transport()
d.Info("send请求前", "device.Transport", d.Transport, "req.Transport()", transportBefore, "via.Transport", func() string {
if via != nil {
return via.Transport
}
return "nil"
}())
req.SetTransport(d.Transport)
return d.client.Do(context.Background(), req)
transportAfter := req.Transport()
d.Info("send请求SetTransport后", "req.Transport()", transportAfter)
d.Trace("send", "req", req.String())
return d.client.Do(d, req)
}
func (d *Device) Go() (err error) {
@@ -523,17 +530,20 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, Recipient any) *sip.Req
Address: sip.Uri{User: d.DeviceId, Host: d.HostAddress},
}
req.AppendHeader(&toHeader)
//viaHeader := sip.ViaHeader{
// ProtocolName: "SIP",
// ProtocolVersion: "2.0",
// Transport: "UDP",
// Host: d.SipIp,
// Port: d.LocalPort,
// Params: sip.HeaderParams(sip.NewParams()),
//}
//viaHeader.Params.Add("branch", sip.GenerateBranchN(10)).Add("rport", "")
//req.AppendHeader(&viaHeader)
req.AppendHeader(&d.contactHDR)
// 添加Via头部使用设备的Transport协议
// Via头部必须用PrependHeader放在最前面这样Client才能正确识别Transport
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: d.Transport, // 使用设备注册时的Transport
Host: d.SipIp,
Port: d.LocalPort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
req.PrependHeader(&viaHeader)
return req
}
@@ -754,14 +764,14 @@ func (d *Device) onNotify(req *sip.Request, tx sip.ServerTransaction, msg *gb281
if strings.Contains(string(notifyBody), "<Notify>") {
// 处理 Notify 通知
notify := &gb28181.AlarmNotify{}
if err := gb28181.DecodeXML(notify, notifyBody); err != nil {
if err := gb28181.DecodeXML(notify, notifyBody, d.Charset); err != nil {
return fmt.Errorf("decode notify xml error: %v", err)
}
if notify.CmdType == "MobilePosition" {
// 处理 MobilePosition 通知
posNotify := &gb28181.MobilePositionNotify{}
if err := gb28181.DecodeXML(posNotify, notifyBody); err != nil {
if err := gb28181.DecodeXML(posNotify, notifyBody, d.Charset); err != nil {
return fmt.Errorf("decode mobile position notify xml error: %v", err)
}
@@ -870,7 +880,7 @@ func (d *Device) onNotify(req *sip.Request, tx sip.ServerTransaction, msg *gb281
if strings.Contains(string(notifyBody), "<Response>") {
// 重新解析为 Response 消消息
response := &gb28181.Message{}
if err := gb28181.DecodeXML(response, notifyBody); err != nil {
if err := gb28181.DecodeXML(response, notifyBody, d.Charset); err != nil {
return fmt.Errorf("decode response xml error: %v", err)
}

View File

@@ -254,15 +254,6 @@ func (d *Dialog) Start() (err error) {
//customCallID := fmt.Sprintf("%s-%s-%d@%s", device.DeviceId, channelId, time.Now().Unix(), device.SipIp)
customCallID := fmt.Sprintf("%s@%s", GenerateCallID(32), device.MediaIp)
callID := sip.CallIDHeader(customCallID)
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: device.Transport,
Host: device.MediaIp,
Port: device.LocalPort,
Params: sip.NewParams(),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(10)).Add("rport", "")
maxforward := sip.MaxForwardsHeader(70)
//contentLengthHeader := sip.ContentLengthHeader(len(strings.Join(sdpInfo, "\r\n") + "\r\n"))
csqHeader := sip.CSeqHeader{
@@ -287,23 +278,34 @@ func (d *Dialog) Start() (err error) {
Params: sip.NewParams(),
}
fromHDR.Params.Add("tag", sip.GenerateTagN(32))
// 输出设备Transport信息用于调试
d.Info("准备发送INVITE", "deviceId", device.DeviceId, "device.Transport", device.Transport, "device.SipIp", device.SipIp, "device.LocalPort", device.LocalPort)
dialogClientCache := sipgo.NewDialogClientCache(device.client, contactHDR)
// 创建会话
d.Info("start to invite", "recipient:", recipient, " viaHeader:", viaHeader, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:",
// 创建会话不传递Via头部让Client自动创建
d.Info("start to invite", "recipient:", recipient, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:",
device.contactHDR, "contactHDR:", contactHDR, "sdpInfo:", strings.Join(sdpInfo, "\r\n")+"\r\n")
d.pullCtx.GoToStepConst(StepInviteSend)
// 判断当前系统类型
//if runtime.GOOS == "windows" {
// d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
//} else {
if strings.ToLower(device.Transport) == "tcp" {
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
} else {
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
// 创建Via头部使用设备的Transport协议
// Via头部必须放在第一个位置这样AppendHeader时Via会在最前面
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: device.Transport, // 使用设备注册时的Transport
Host: device.SipIp,
Port: device.LocalPort,
Params: sip.HeaderParams(sip.NewParams()),
}
//}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
d.Info("发送INVITE使用Transport", "transport", device.Transport, "via", viaHeader)
// Via头部必须是第一个参数这样即使用AppendHeaderVia也会在最前面
// 这样Client检查req.Via()时就能找到我们的Via头部不会再创建默认的UDP Via
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
// 最后添加Content-Length头部
if err != nil {
d.pullCtx.Fail("dialog invite error: " + err.Error())

View File

@@ -77,16 +77,24 @@ func (d *ForwardDialog) Start() (err error) {
// 注册对话到集合,使用类型转换
d.MediaPort = uint16(0)
d.Debug("ForwardDialog端口分配", "device.StreamMode", device.StreamMode, "StreamModeTCPActive", mrtp.StreamModeTCPActive)
if device.StreamMode != mrtp.StreamModeTCPActive {
if d.gb.MediaPort.Valid() {
d.Debug("ForwardDialog端口分配路径", "path", "tcpPB.Allocate()", "MediaPort.Valid", true)
var ok bool
d.MediaPort, ok = d.gb.tcpPB.Allocate()
if !ok {
return fmt.Errorf("no available tcp port")
}
d.Debug("ForwardDialog端口分配成功", "allocatedPort", d.MediaPort)
} else {
d.Debug("ForwardDialog端口分配路径", "path", "MediaPort[0]", "MediaPort.Valid", false)
d.MediaPort = d.gb.MediaPort[0]
d.Debug("ForwardDialog端口分配成功", "defaultPort", d.MediaPort)
}
} else {
d.Debug("ForwardDialog端口分配", "path", "StreamModeTCPActive不分配端口", "MediaPort", d.MediaPort)
}
// 使用上级平台的SSRC如果有或者设备的CreateSSRC方法
@@ -180,15 +188,6 @@ func (d *ForwardDialog) Start() (err error) {
recipient := device.Recipient
recipient.User = channelId
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: "UDP",
Host: device.SipIp,
Port: device.LocalPort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16)).Add("rport", "")
fromHDR := sip.FromHeader{
Address: sip.Uri{
User: d.gb.Serial,
@@ -201,11 +200,30 @@ func (d *ForwardDialog) Start() (err error) {
Address: sip.Uri{User: channelId, Host: channelId[0:10]},
}
fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 输出设备Transport信息用于调试
d.Info("ForwardDialog准备发送INVITE", "deviceId", device.DeviceId, "device.Transport", device.Transport, "device.SipIp", device.SipIp, "device.LocalPort", device.LocalPort)
// 创建会话 - 使用device的dialogClient创建
dialogClientCache := sipgo.NewDialogClientCache(device.client, device.contactHDR)
d.Info("start to invite", "recipient:", recipient, " viaHeader:", viaHeader, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:", device.contactHDR, "contactHDR:", device.contactHDR)
//d.session, err = dialogClientCache.Invite(d.gb, recipient, request.Body(), &fromHDR, &toHeader, &viaHeader, subjectHeader, &contentTypeHeader)
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), &fromHDR, &toHeader, subjectHeader, &contentTypeHeader)
d.Info("start to invite", "recipient:", recipient, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:", device.contactHDR, "contactHDR:", device.contactHDR)
// 创建Via头部使用设备的Transport协议
// Via头部必须放在第一个位置
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: device.Transport, // 使用设备注册时的Transport
Host: device.SipIp,
Port: device.LocalPort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
d.Info("ForwardDialog发送INVITE使用Transport", "transport", device.Transport, "via", viaHeader)
// Via头部必须是第一个参数
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &fromHDR, &toHeader, subjectHeader, &contentTypeHeader)
return
}

View File

@@ -57,6 +57,9 @@ type GB28181Plugin struct {
AutoMigrate bool `default:"true" desc:"自动迁移数据库结构并初始化根组织"`
ua *sipgo.UserAgent
server *sipgo.Server
clients util.Collection[string, *ClientWrapper] // Client池key为"IP:Port"
defaultSipIP string // 默认SIP IP
defaultSipPort int // 默认SIP Port
devices task.WorkCollection[string, *Device]
dialogs util.Collection[string, *Dialog]
forwardDialogs util.Collection[uint32, *ForwardDialog]
@@ -76,6 +79,16 @@ type GB28181Plugin struct {
downloadDialogs task.WorkCollection[string, *DownloadDialog]
}
// ClientWrapper 包装sipgo.Client以实现GetKey接口
type ClientWrapper struct {
*sipgo.Client
key string
}
func (c *ClientWrapper) GetKey() string {
return c.key
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
RegisterGRPCHandler: pb.RegisterApiHandler,
ServiceDesc: &pb.Api_ServiceDesc,
@@ -156,7 +169,11 @@ func (gb *GB28181Plugin) Start() (err error) {
return pkg.ErrNoDB
}
gb.Info("GB28181 initing", gb.Platforms)
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: false,
}))
slog.SetDefault(logger) // 设置为默认logger确保所有日志都使用这个配置
// 设置 TCP 传输模式
tcpOption := sip.WithTransportLayerConnectionReuse(true) // 启用连接重用
gb.ua, err = sipgo.NewUA(
@@ -173,6 +190,7 @@ func (gb *GB28181Plugin) Start() (err error) {
gb.dialogs.L = new(sync.RWMutex)
gb.forwardDialogs.L = new(sync.RWMutex)
gb.singlePorts.L = new(sync.RWMutex)
gb.clients.L = new(sync.RWMutex)
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnMessage(gb.OnMessage)
gb.server.OnRegister(gb.OnRegister)
@@ -226,6 +244,38 @@ func (gb *GB28181Plugin) Start() (err error) {
return err
}
}
// 初始化默认SIP配置
// 用于在无法从设备请求中确定本地IP时使用
gb.defaultSipIP = gb.SipIP
gb.defaultSipPort = 5060 // 默认端口
if gb.defaultSipIP == "" {
// 从第一个监听地址提取默认IP
if len(gb.Sip.ListenAddr) > 0 {
_, addr, _ := strings.Cut(gb.Sip.ListenAddr[0], ":")
if strings.HasPrefix(addr, ":") {
// 如果是 ":5060" 格式,提取端口
gb.defaultSipIP = "0.0.0.0"
if port, err := strconv.Atoi(strings.TrimPrefix(addr, ":")); err == nil {
gb.defaultSipPort = port
}
} else {
// 如果是 "192.168.1.106:5060" 格式提取IP和端口
host, portStr, _ := net.SplitHostPort(addr)
if host != "" {
gb.defaultSipIP = host
} else {
gb.defaultSipIP = addr
}
if port, err := strconv.Atoi(portStr); err == nil {
gb.defaultSipPort = port
}
}
}
}
gb.Info("默认SIP配置已初始化", "defaultSipIP", gb.defaultSipIP, "defaultSipPort", gb.defaultSipPort)
if gb.DB != nil {
err = gb.initDatabase()
if err != nil {
@@ -246,6 +296,45 @@ func (gb *GB28181Plugin) Start() (err error) {
return
}
// getOrCreateClient 根据IP:Port:Transport获取或创建Client
// hostname: SIP IP地址
// port: SIP端口
// transport: 传输协议TCP/UDP
func (gb *GB28181Plugin) getOrCreateClient(hostname string, port int, transport string) (*sipgo.Client, error) {
// Key包含transport因为同一个IP:Port可能同时有TCP和UDP
key := fmt.Sprintf("%s:%d:%s", hostname, port, strings.ToUpper(transport))
// 尝试从缓存获取
if wrapper, ok := gb.clients.Get(key); ok {
return wrapper.Client, nil
}
// 创建新Client
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: false,
}))
client, err := sipgo.NewClient(gb.ua,
sipgo.WithClientLogger(logger),
sipgo.WithClientHostname(hostname),
sipgo.WithClientPort(port),
)
if err != nil {
return nil, fmt.Errorf("创建Client失败 %s: %v", key, err)
}
// 存入缓存需要包装成实现GetKey的类型
wrapper := &ClientWrapper{
Client: client,
key: key,
}
gb.clients.Set(wrapper)
gb.Info("创建新Client", "hostname", hostname, "port", port, "key", key)
return client, nil
}
func (gb *GB28181Plugin) deleteDevice(device *Device, reason string) bool {
gb.Info(fmt.Sprintf("准备删除设备: %s", reason), "deviceId", device.DeviceId)
@@ -346,15 +435,17 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
User: device.DeviceId,
}
// 创建SIP客户端
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
// 根据设备的SipIp、LocalPort和Transport获取或创建对应的Client
transport := device.Transport
if transport == "" {
transport = "UDP" // 默认UDP
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(device.SipIp))
client, err := gb.getOrCreateClient(device.SipIp, device.LocalPort, transport)
if err != nil {
gb.Error("创建Device Client失败", "error", err, "deviceId", device.DeviceId, "sipIp", device.SipIp, "localPort", device.LocalPort, "transport", transport)
continue
}
device.client = client
device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR)
device.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice m7s.IPullProxy) bool {
@@ -543,18 +634,6 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
}
func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
// 解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body())
gb.Debug("OnMessage debug", "message", temp.BasicParam.Expiration)
if err != nil {
gb.Error("OnMessage", "error", err.Error())
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
from := req.From()
if from == nil || from.Address.User == "" {
gb.Error("OnMessage", "error", "no user")
@@ -562,12 +641,29 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
}
id := from.Address.User
// 检查消息来源
// 检查消息来源,获取字符集配置
var d *Device
var p *gb28181.PlatformModel
var charset string = "GB2312" // 默认字符集
// 先从设备缓存中获取
d, _ = gb.devices.Get(id)
if d != nil && d.Charset != "" {
charset = d.Charset
}
// 使用正确的字符集解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body(), charset)
gb.Debug("OnMessage debug", "message", temp.BasicParam.Expiration, "charset", charset)
if err != nil {
gb.Error("OnMessage", "error", err.Error(), "charset", charset)
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
// 检查是否是平台
//if gb.DB != nil {
@@ -623,19 +719,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
}
func (gb *GB28181Plugin) OnNotify(req *sip.Request, tx sip.ServerTransaction) {
// 解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body())
gb.Debug("onnotify debug", "message", temp)
if err != nil {
gb.Error("OnNotify", "error", err.Error())
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
from := req.From()
if from == nil || from.Address.User == "" {
gb.Error("OnNotify", "error", "no user")
@@ -643,12 +726,29 @@ func (gb *GB28181Plugin) OnNotify(req *sip.Request, tx sip.ServerTransaction) {
}
id := from.Address.User
// 检查消息来源
// 检查消息来源,获取字符集配置
var d *Device
var p *gb28181.PlatformModel
var charset string = "GB2312" // 默认字符集
// 先从设备缓存中获取
d, _ = gb.devices.Get(id)
if d != nil && d.Charset != "" {
charset = d.Charset
}
// 使用正确的字符集解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body(), charset)
gb.Debug("onnotify debug", "message", temp, "charset", charset)
if err != nil {
gb.Error("OnNotify", "error", err.Error(), "charset", charset)
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
// 检查是否是平台
if gb.DB != nil {
@@ -977,7 +1077,7 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
return dialog.platformCallId == callID
}); ok {
if forwardDialog.channel.StreamPath == "" { //为空表示是正常的GB设备
pullUrl := fmt.Sprintf("%s/%s", forwardDialog.channel.ParentId, forwardDialog.channel.ChannelId)
pullUrl := fmt.Sprintf("%s/%s", util.Conditional(forwardDialog.channel.DeviceId == "", forwardDialog.channel.ParentId, forwardDialog.channel.DeviceId), forwardDialog.channel.ChannelId)
streamPath := fmt.Sprintf("platform_%d/%s/%s", time.Now().UnixMilli(), forwardDialog.channel.DeviceId, forwardDialog.channel.ChannelId)
// 创建配置

View File

@@ -9,7 +9,6 @@ import (
"strings"
"time"
"golang.org/x/net/html/charset"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
)
@@ -243,14 +242,134 @@ type (
}
)
func DecodeXML(v any, body []byte) error {
decoder := xml.NewDecoder(bytes.NewReader(body))
decoder.CharsetReader = charset.NewReaderLabel
// DecodeXML 根据指定的字符集解码XML
// charset: 字符集,如 "GB2312", "UTF-8" 等如果为空则默认使用GB2312
func DecodeXML(v any, body []byte, charset string) error {
// 标准化字符集名称
if charset == "" {
charset = "GB2312" // 默认使用GB2312
}
charset = strings.ToUpper(strings.TrimSpace(charset))
// 提取XML声明中的encoding
declaredEncoding := extractXMLEncoding(body)
// 判断是否需要转换编码
needConvert := false
if declaredEncoding != "" {
declaredEncoding = strings.ToUpper(strings.TrimSpace(declaredEncoding))
// 如果声明的encoding与配置的charset不一致需要转换
if !isSameCharset(declaredEncoding, charset) {
needConvert = true
}
}
var finalBody []byte
if needConvert {
// 需要转换根据配置的charset决定如何处理
switch charset {
case "GB2312", "GBK", "GB18030":
// 配置说实际是GB2312先用GBK解码转成UTF-8
reader := transform.NewReader(bytes.NewReader(body), simplifiedchinese.GBK.NewDecoder())
utf8Body, err := io.ReadAll(reader)
if err != nil {
return fmt.Errorf("convert from %s to UTF-8 failed: %v", charset, err)
}
// 修改XML声明为UTF-8
finalBody = replaceXMLEncoding(utf8Body, "UTF-8")
case "UTF-8", "UTF8":
// 配置说实际是UTF-8但声明不是UTF-8
// 只需要修改XML声明不转换内容
finalBody = replaceXMLEncoding(body, "UTF-8")
default:
return fmt.Errorf("unsupported charset: %s", charset)
}
} else {
// 不需要转换直接使用原始body
finalBody = body
}
// 解析XML
decoder := xml.NewDecoder(bytes.NewReader(finalBody))
decoder.CharsetReader = func(declaredCharset string, input io.Reader) (io.Reader, error) {
// 如果XML声明的是GB2312/GBK提供GBK解码器
dc := strings.ToUpper(strings.TrimSpace(declaredCharset))
switch dc {
case "GB2312", "GBK", "GB18030":
return transform.NewReader(input, simplifiedchinese.GBK.NewDecoder()), nil
default:
return input, nil
}
}
err := decoder.Decode(v)
if err != nil {
decoder = xml.NewDecoder(transform.NewReader(bytes.NewReader(body), simplifiedchinese.GBK.NewDecoder()))
decoder.CharsetReader = charset.NewReaderLabel
return decoder.Decode(v)
return fmt.Errorf("decode XML failed: %v", err)
}
return nil
}
// extractXMLEncoding 从XML中提取encoding声明
func extractXMLEncoding(body []byte) string {
// 查找 encoding="xxx" 或 encoding='xxx'
bodyStr := string(body[:min(len(body), 200)]) // 只检查前200字节
if idx := strings.Index(bodyStr, "encoding="); idx >= 0 {
start := idx + 9
if start < len(bodyStr) {
quote := bodyStr[start]
if quote == '"' || quote == '\'' {
end := strings.IndexByte(bodyStr[start+1:], quote)
if end >= 0 {
return bodyStr[start+1 : start+1+end]
}
}
}
}
return ""
}
// isSameCharset 判断两个字符集是否相同
func isSameCharset(a, b string) bool {
a = strings.ToUpper(strings.TrimSpace(a))
b = strings.ToUpper(strings.TrimSpace(b))
// GB2312, GBK, GB18030 视为相同
if (a == "GB2312" || a == "GBK" || a == "GB18030") &&
(b == "GB2312" || b == "GBK" || b == "GB18030") {
return true
}
// UTF-8 和 UTF8 视为相同
if (a == "UTF-8" || a == "UTF8") && (b == "UTF-8" || b == "UTF8") {
return true
}
return a == b
}
// replaceXMLEncoding 替换XML声明中的encoding
func replaceXMLEncoding(body []byte, newEncoding string) []byte {
bodyStr := string(body)
if idx := strings.Index(bodyStr, "encoding="); idx >= 0 {
start := idx + 9
if start < len(bodyStr) {
quote := bodyStr[start]
if quote == '"' || quote == '\'' {
end := strings.IndexByte(bodyStr[start+1:], quote)
if end >= 0 {
// 替换encoding值
return []byte(bodyStr[:start+1] + newEncoding + bodyStr[start+1+end:])
}
}
}
}
return body
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/langhuihui/gotask"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
"m7s.live/v5"
@@ -46,14 +47,37 @@ type Platform struct {
register *Register
}
// UTF8ToGB2312 将UTF-8编码的字符串转换为GB2312编码
func UTF8ToGB2312(s string) (string, error) {
reader := transform.NewReader(bytes.NewReader([]byte(s)), simplifiedchinese.GB18030.NewEncoder())
// EncodeToCharset 将UTF-8编码的字符串转换为指定字符集的字节数组
// charset: 目标字符集,支持 "GB2312", "GBK", "UTF-8" 等
func EncodeToCharset(s string, charset string) ([]byte, string, error) {
// 标准化字符集名称
charset = strings.ToUpper(strings.TrimSpace(charset))
if charset == "" {
charset = "GB2312" // 默认使用GB2312
}
// 如果是UTF-8直接返回
if charset == "UTF-8" || charset == "UTF8" {
return []byte(s), "UTF-8", nil
}
// 选择编码器
var encoder *encoding.Encoder
switch charset {
case "GB2312", "GBK", "GB18030":
encoder = simplifiedchinese.GB18030.NewEncoder()
default:
// 不支持的字符集返回UTF-8
return []byte(s), "UTF-8", fmt.Errorf("unsupported charset: %s, using UTF-8", charset)
}
// 执行编码转换
reader := transform.NewReader(bytes.NewReader([]byte(s)), encoder)
d, err := io.ReadAll(reader)
if err != nil {
return "", err
return []byte(s), "UTF-8", fmt.Errorf("encoding failed: %v", err)
}
return string(d), nil
return d, charset, nil
}
func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181Plugin, unRegister bool) *Platform {
@@ -62,9 +86,16 @@ func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181Plugin, unRegister bo
plugin: plugin,
unRegister: unRegister,
}
client, err := sipgo.NewClient(p.plugin.ua, sipgo.WithClientHostname(p.PlatformModel.DeviceIP), sipgo.WithClientPort(p.PlatformModel.DevicePort))
// Platform根据配置的DeviceIP、DevicePort和Transport获取或创建对应的Client
// 这样可以支持多网卡场景每个Platform可以使用不同的网卡和传输协议
transport := pm.Transport
if transport == "" {
transport = "UDP" // 默认UDP
}
client, err := plugin.getOrCreateClient(pm.DeviceIP, pm.DevicePort, transport)
if err != nil {
p.Error("failed to create sip client", "err", err)
plugin.Error("创建Platform Client失败", "error", err, "deviceIP", pm.DeviceIP, "devicePort", pm.DevicePort, "transport", transport)
return nil
}
p.Client = client
userAgentHeader := sip.NewHeader("User-Agent", "M7S/"+m7s.Version)
@@ -347,13 +378,29 @@ func (p *Platform) Register(isUnregister bool) error {
// 创建新的带认证信息的请求
newReq := req.Clone()
newReq.RemoveHeader("Via") // 必须由传输层重新生成
newReq.RemoveHeader("Via")
newReq.AppendHeader(sip.NewHeader("Authorization", cred.String()))
newReq.CSeq().SeqNo = uint32(p.SN) // 更新CSeq序号
p.SN++
// 手动添加Via头部使用平台配置的Transport
transport := p.PlatformModel.Transport
if transport == "" {
transport = "UDP"
}
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: transport,
Host: p.PlatformModel.DeviceIP,
Port: p.PlatformModel.DevicePort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
newReq.PrependHeader(viaHeader)
// 发送认证请求
tx, err = p.Client.TransactionRequest(p, newReq, sipgo.ClientRequestAddVia)
tx, err = p.Client.TransactionRequest(p, newReq)
if err != nil {
p.plugin.Error(logTag, "error", err.Error())
return err
@@ -501,7 +548,24 @@ func (p *Platform) handleCatalog(req *sip.Request, tx sip.ServerTransaction, msg
// CreateRequest 创建 SIP 请求
func (p *Platform) CreateRequest(method string) *sip.Request {
request := sip.NewRequest(sip.RequestMethod(method), p.Recipient)
//request.SetDestination(p.Recipient.String())
// 添加Via头部使用平台配置的Transport协议
// Via头部必须用PrependHeader放在最前面
transport := p.PlatformModel.Transport
if transport == "" {
transport = "UDP" // 默认UDP
}
viaHeader := sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: transport,
Host: p.PlatformModel.DeviceIP,
Port: p.PlatformModel.DevicePort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
request.PrependHeader(&viaHeader)
return request
}
@@ -544,11 +608,15 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
//request.AppendHeader(&viaHeader)
request.SetTransport(req.Transport())
contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml")
request.AppendHeader(&contentTypeHeader)
// 根据平台配置的字符集进行编码
charset := p.PlatformModel.CharacterSet
if charset == "" {
charset = "GB2312" // 默认使用GB2312
}
// 空目录列表XML
xmlContent := fmt.Sprintf(`<?xml version="1.0" encoding="GB2312"?>
xmlContent := fmt.Sprintf(`<?xml version="1.0" encoding="%s"?>
<Response>
<CmdType>Catalog</CmdType>
<SN>%s</SN>
@@ -556,8 +624,17 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
<SumNum>0</SumNum>
<DeviceList Num="0">
</DeviceList>
</Response>`, sn, p.PlatformModel.DeviceGBID)
request.SetBody([]byte(xmlContent))
</Response>`, charset, sn, p.PlatformModel.DeviceGBID)
encodedContent, actualCharset, err := EncodeToCharset(xmlContent, charset)
if err != nil {
p.Error("sendCatalogResponse 编码转换失败", "error", err.Error(), "charset", charset)
request.SetBody([]byte(xmlContent))
actualCharset = "UTF-8" // 失败时使用UTF-8
} else {
request.SetBody(encodedContent)
}
request.AppendHeader(sip.NewHeader("Content-Type", fmt.Sprintf("Application/MANSCDP+xml;charset=%s", actualCharset)))
// 修正使用TransactionRequest替代Do
tx, err := p.Client.TransactionRequest(p, request)
@@ -621,11 +698,27 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
// 创建新的带认证信息的请求
newReq := request.Clone()
newReq.RemoveHeader("Via") // 必须由传输层重新生成
newReq.RemoveHeader("Via")
newReq.AppendHeader(sip.NewHeader("Authorization", cred.String()))
// 手动添加Via头部使用平台配置的Transport
transport := p.PlatformModel.Transport
if transport == "" {
transport = "UDP"
}
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: transport,
Host: p.PlatformModel.DeviceIP,
Port: p.PlatformModel.DevicePort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
newReq.PrependHeader(viaHeader)
// 发送认证请求
tx, err = p.Client.TransactionRequest(p, newReq, sipgo.ClientRequestAddVia)
tx, err = p.Client.TransactionRequest(p, newReq)
if err != nil {
p.Error("sendCatalogResponse", "error", err.Error())
return err
@@ -686,12 +779,16 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
//request.AppendHeader(&viaHeader)
request.SetTransport(req.Transport())
contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml")
request.AppendHeader(&contentTypeHeader)
// 根据平台配置的字符集进行编码
charset := p.PlatformModel.CharacterSet
if charset == "" {
charset = "GB2312" // 默认使用GB2312
}
// 为单个通道创建XML
channelXML := p.buildChannelItem(channel)
xmlContent := fmt.Sprintf(`<?xml version="1.0" encoding="GB2312"?>
xmlContent := fmt.Sprintf(`<?xml version="1.0" encoding="%s"?>
<Response>
<CmdType>Catalog</CmdType>
<SN>%s</SN>
@@ -700,9 +797,17 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
<DeviceList Num="1">
%s
</DeviceList>
</Response>`, sn, p.PlatformModel.DeviceGBID, len(channels), channelXML)
</Response>`, charset, sn, p.PlatformModel.DeviceGBID, len(channels), channelXML)
request.SetBody([]byte(xmlContent))
encodedContent, actualCharset, err := EncodeToCharset(xmlContent, charset)
if err != nil {
p.Error("sendCatalogResponse 编码转换失败", "error", err.Error(), "charset", charset, "channel_index", i)
request.SetBody([]byte(xmlContent))
actualCharset = "UTF-8" // 失败时使用UTF-8
} else {
request.SetBody(encodedContent)
}
request.AppendHeader(sip.NewHeader("Content-Type", fmt.Sprintf("Application/MANSCDP+xml;charset=%s", actualCharset)))
// 修正使用TransactionRequest替代Do
tx, err := p.Client.TransactionRequest(p, request)
@@ -768,11 +873,27 @@ func (p *Platform) sendCatalogResponse(req *sip.Request, sn string, fromTag stri
// 创建新的带认证信息的请求
newReq := request.Clone()
newReq.RemoveHeader("Via") // 必须由传输层重新生成
newReq.RemoveHeader("Via")
newReq.AppendHeader(sip.NewHeader("Authorization", cred.String()))
// 手动添加Via头部使用平台配置的Transport
transport := p.PlatformModel.Transport
if transport == "" {
transport = "UDP"
}
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: transport,
Host: p.PlatformModel.DeviceIP,
Port: p.PlatformModel.DevicePort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
newReq.PrependHeader(viaHeader)
// 发送认证请求
tx, err = p.Client.TransactionRequest(p, newReq, sipgo.ClientRequestAddVia)
tx, err = p.Client.TransactionRequest(p, newReq)
if err != nil {
p.Error("sendCatalogResponse", "error", err.Error(), "channel_index", i)
return err
@@ -852,7 +973,7 @@ func (p *Platform) buildChannelItem(channel gb28181.DeviceChannel) string {
channel.RegisterWay, // 直接使用整数值
channel.Secrecy, // 直接使用整数值
parentID,
channel.Parental, // 直接使用整数值
channel.Parental, // 直接使用整数值
channel.SafetyWay) // 直接使用整数值
}
@@ -1199,16 +1320,20 @@ func (p *Platform) sendDeviceInfoResponse(req *sip.Request, device *Device, sn s
</Response>`, sn, device.DeviceId, device.Name, device.Manufacturer, device.Model, device.Firmware, device.ChannelCount)
}
// 将UTF-8编码的XML内容转换为GB2312编码
gb2312Content, err := UTF8ToGB2312(xmlContent)
if err != nil {
p.Error("sendDeviceInfoResponse", "encoding error", err.Error())
// 如果转换失败,仍然使用原始内容,避免完全失败
request.SetBody([]byte(xmlContent))
} else {
// 使用转换后的GB2312编码内容
request.SetBody([]byte(gb2312Content))
// 根据平台配置的字符集进行编码
charset := p.PlatformModel.CharacterSet
if charset == "" {
charset = "GB2312" // 默认使用GB2312
}
encodedContent, actualCharset, err := EncodeToCharset(xmlContent, charset)
if err != nil {
p.Error("sendDeviceInfoResponse 编码转换失败", "error", err.Error(), "charset", charset)
request.SetBody([]byte(xmlContent))
actualCharset = "UTF-8" // 失败时使用UTF-8
} else {
request.SetBody(encodedContent)
}
request.AppendHeader(sip.NewHeader("Content-Type", fmt.Sprintf("Application/MANSCDP+xml;charset=%s", actualCharset)))
// 修正:使用正确的上下文参数
tx, err := p.Client.TransactionRequest(p, request)
@@ -1272,11 +1397,27 @@ func (p *Platform) sendDeviceInfoResponse(req *sip.Request, device *Device, sn s
// 创建新的带认证信息的请求
newReq := request.Clone()
newReq.RemoveHeader("Via") // 必须由传输层重新生成
newReq.RemoveHeader("Via")
newReq.AppendHeader(sip.NewHeader("Authorization", cred.String()))
// 手动添加Via头部使用平台配置的Transport
transport := p.PlatformModel.Transport
if transport == "" {
transport = "UDP"
}
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: transport,
Host: p.PlatformModel.DeviceIP,
Port: p.PlatformModel.DevicePort,
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
newReq.PrependHeader(viaHeader)
// 发送认证请求
tx, err = p.Client.TransactionRequest(p, newReq, sipgo.ClientRequestAddVia)
tx, err = p.Client.TransactionRequest(p, newReq)
if err != nil {
p.Error("sendDeviceInfoResponse", "error", err.Error())
return err

View File

@@ -3,18 +3,15 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"log/slog"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
myip "github.com/husanpao/ip"
"github.com/icholy/digest"
"github.com/langhuihui/gotask"
task "github.com/langhuihui/gotask"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
@@ -304,14 +301,13 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
d.RegisterTime = time.Now()
d.Online = true
d.Transport = req.Transport()
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
// 根据设备访问的本地IP、端口和传输协议获取或创建对应的Client
client, err := task.gb.getOrCreateClient(d.SipIp, d.LocalPort, d.Transport)
if err != nil {
task.gb.Error("创建Device Client失败", "error", err, "sipIp", d.SipIp, "localPort", d.LocalPort, "transport", d.Transport)
return
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp))
d.client = client
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.plugin = task.gb
@@ -441,14 +437,13 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
d.Logger = task.gb.Logger.With("deviceid", deviceid)
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
// 根据设备访问的本地IP、端口和传输协议获取或创建对应的Client
client, err := task.gb.getOrCreateClient(d.SipIp, d.LocalPort, d.Transport)
if err != nil {
task.gb.Error("创建Device Client失败", "error", err, "sipIp", d.SipIp, "localPort", d.LocalPort, "transport", d.Transport)
return
}
logHandler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(logHandler)
slog.SetDefault(logger) // 设置为默认日志记录器
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(logger), sipgo.WithClientHostname(d.SipIp))
d.client = client
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.Info("StoreDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "req.Recipient", req.Recipient, "myPort", myPort, "d.Recipient", d.Recipient)

View File

@@ -29,9 +29,11 @@ type ForwardConfig struct {
// Forwarder 转发器
type Forwarder struct {
config *ForwardConfig
source net.Conn
target net.Conn
config *ForwardConfig
source net.Conn
target net.Conn
sourceListener net.Listener // 保存source的listener用于cleanup时关闭
targetListener net.Listener // 保存target的listener用于cleanup时关闭
}
// NewForwarder 创建新的转发器
@@ -53,21 +55,32 @@ func (f *Forwarder) establishSourceConnection(config ConnectionConfig) (net.Conn
return netConn, nil
case StreamModeTCPPassive:
listener, err := net.Listen("tcp4", fmt.Sprintf(":%d", config.Port))
addr := fmt.Sprintf(":%d", config.Port)
fmt.Printf("[Forwarder] TCP-PASSIVE: 开始监听端口 %s\n", addr)
listener, err := net.Listen("tcp4", addr)
if err != nil {
fmt.Printf("[Forwarder] TCP-PASSIVE: 监听失败 %s, err=%v\n", addr, err)
return nil, fmt.Errorf("listen failed: %v", err)
}
fmt.Printf("[Forwarder] TCP-PASSIVE: 监听成功 %s, listener=%p\n", addr, listener)
// 保存listener用于cleanup时关闭
f.sourceListener = listener
// Set timeout for accepting connections
if tcpListener, ok := listener.(*net.TCPListener); ok {
tcpListener.SetDeadline(time.Now().Add(30 * time.Second))
}
fmt.Printf("[Forwarder] TCP-PASSIVE: 等待连接 %s\n", addr)
netConn, err := listener.Accept()
if err != nil {
fmt.Printf("[Forwarder] TCP-PASSIVE: Accept失败 %s, err=%v, 关闭listener\n", addr, err)
listener.Close()
f.sourceListener = nil
return nil, fmt.Errorf("accept failed: %v", err)
}
fmt.Printf("[Forwarder] TCP-PASSIVE: Accept成功 %s, conn=%p, listener=%p 已保存到f.sourceListener\n", addr, netConn, listener)
return netConn, nil
@@ -104,6 +117,9 @@ func (f *Forwarder) establishTargetConnection(config ConnectionConfig) (net.Conn
return nil, fmt.Errorf("listen failed: %v", err)
}
// 保存listener用于cleanup时关闭
f.targetListener = listener
// Set timeout for accepting connections
if tcpListener, ok := listener.(*net.TCPListener); ok {
tcpListener.SetDeadline(time.Now().Add(30 * time.Second))
@@ -112,6 +128,7 @@ func (f *Forwarder) establishTargetConnection(config ConnectionConfig) (net.Conn
netConn, err := listener.Accept()
if err != nil {
listener.Close()
f.targetListener = nil
return nil, fmt.Errorf("accept failed: %v", err)
}
@@ -153,12 +170,32 @@ func (f *Forwarder) setupConnections() error {
// cleanup 清理连接
func (f *Forwarder) cleanup() {
fmt.Printf("[Forwarder] cleanup: 开始清理, source=%p, target=%p, sourceListener=%p, targetListener=%p\n",
f.source, f.target, f.sourceListener, f.targetListener)
// 先关闭连接
if f.source != nil {
fmt.Printf("[Forwarder] cleanup: 关闭source连接 %p\n", f.source)
f.source.Close()
}
if f.target != nil {
fmt.Printf("[Forwarder] cleanup: 关闭target连接 %p\n", f.target)
f.target.Close()
}
// 再关闭listener重要释放端口
if f.sourceListener != nil {
fmt.Printf("[Forwarder] cleanup: ✅ 关闭sourceListener %p释放端口\n", f.sourceListener)
f.sourceListener.Close()
f.sourceListener = nil
}
if f.targetListener != nil {
fmt.Printf("[Forwarder] cleanup: ✅ 关闭targetListener %p释放端口\n", f.targetListener)
f.targetListener.Close()
f.targetListener = nil
}
fmt.Printf("[Forwarder] cleanup: 清理完成\n")
}
// createRTPReader 创建RTP读取器
@@ -454,23 +491,38 @@ func (p *RTPProcessor) processFragmentedPacket(packet *rtp.Packet, sequenceNumbe
// Forward 执行转发
func (f *Forwarder) Forward(ctx context.Context) error {
fmt.Printf("[Forwarder] Forward: 开始, source=%s:%d mode=%s, target=%s:%d mode=%s\n",
f.config.Source.IP, f.config.Source.Port, f.config.Source.Mode,
f.config.Target.IP, f.config.Target.Port, f.config.Target.Mode)
// 建立连接
err := f.setupConnections()
if err != nil {
fmt.Printf("[Forwarder] Forward: setupConnections失败, err=%v\n", err)
return err
}
defer f.cleanup()
fmt.Printf("[Forwarder] Forward: setupConnections成功, source=%p, target=%p\n", f.source, f.target)
defer func() {
fmt.Printf("[Forwarder] Forward: 准备执行cleanup (defer)\n")
f.cleanup()
}()
// 检查是否为中继模式
if f.config.Relay {
fmt.Printf("[Forwarder] Forward: 使用中继模式\n")
processor := NewRelayProcessor(f.source, f.target, f.config.Source.Mode, f.config.Target.Mode)
return processor.Process(ctx)
err := processor.Process(ctx)
fmt.Printf("[Forwarder] Forward: 中继模式结束, err=%v\n", err)
return err
}
// RTP处理模式
fmt.Printf("[Forwarder] Forward: 使用RTP处理模式\n")
reader := f.createRTPReader()
writer := f.createRTPWriter()
processor := NewRTPProcessor(reader, writer, f.config)
return processor.Process(ctx)
err = processor.Process(ctx)
fmt.Printf("[Forwarder] Forward: RTP处理模式结束, err=%v\n", err)
return err
}