Compare commits

...

17 Commits

Author SHA1 Message Date
charlestamz
dcabca1ee8 Merge pull request #45 from INeverCry/v3
1.目录订阅head添加Contact,部分厂商必须要该字段 2.添加订阅设备位置 3.添加Notify消息处理
2022-04-11 10:26:56 +08:00
lqh
7414285731 1.目录订阅head添加Contact,部分厂商必须要该字段
2.添加订阅设备位置
3.添加Notify消息处理
2022-04-09 23:00:49 +08:00
charlie
931f8d888d 1. added LogVerbose parameter in config
2. fixed  problems that public IP address communicates to private networking devices
****attention:
language spec was updated to 1.18
2022-04-05 05:19:18 +08:00
dexter
c9709d0e48 Merge pull request #42 from GeekChengZ/v3
Update channel.go
2022-03-08 13:54:05 +08:00
GeekCheng
61441cc372 Update channel.go
查询所有录像产生类型
2022-03-08 11:11:15 +08:00
charlie
803543fbdf 处理invite发送过快导致callid随机字符串一致的问题 2022-02-28 14:10:33 +08:00
charlie
03a2aeec2c tested on more cameras 2022-02-16 14:26:15 +08:00
charlie
8bb92f87f1 fix self-channel becoming children 2022-02-11 11:35:23 +08:00
charlie
3b9f9abc4a dependency fix 2022-02-10 14:54:10 +08:00
charlie
813447b355 完善gb28181协议 2022-02-10 11:28:30 +08:00
charlie
81827cb8c9 thread safe improvement 2022-02-09 00:50:02 +08:00
charlie
4cbffb84b8 refactor 2022-01-27 18:48:00 +08:00
charlie
5b4e520be7 first working code 2022-01-27 14:26:10 +08:00
charlie
cb1bafc269 first compiled code 2022-01-26 18:26:31 +08:00
dexter
1a013b5ebc Merge pull request #38 from charlestamz/v3
fixed memory leak
2022-01-21 08:53:44 +08:00
charlie
63eba38094 fixed memory leak 2022-01-21 00:14:27 +08:00
dexter
3f03146f5c 修改sip请求等待时间 2022-01-19 13:42:15 +08:00
35 changed files with 1614 additions and 1566 deletions

View File

@@ -3,21 +3,22 @@ package gb28181
import (
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
. "github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/utils"
. "github.com/Monibuca/utils/v3"
)
type ChannelEx struct {
device *Device
inviteRes *sip.Message
recordInviteRes *sip.Message
device *Device `json:"-"`
inviteRes *Response `json:"-"`
recordInviteRes *Response `json:"-"`
RecordPublisher *Publisher
LivePublisher *Publisher
LiveSubSP string //实时子码流
@@ -28,6 +29,9 @@ type ChannelEx struct {
recordEndTime time.Time
state int32
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //纬度
}
// Channel 通道
@@ -45,18 +49,19 @@ type Channel struct {
RegisterWay int
Secrecy int
Status string
Children []*Channel
*ChannelEx //自定义属性
Children []*Channel `json:"-"`
*ChannelEx //自定义属性
}
func (c *Channel) CreateMessage(Method sip.Method) (requestMsg *sip.Message) {
requestMsg = c.device.CreateMessage(Method)
requestMsg.StartLine.Uri = sip.NewURI(c.DeviceID + "@" + c.device.to.Uri.Domain())
requestMsg.To = &sip.Contact{
Uri: requestMsg.StartLine.Uri,
func (c *Channel) CreateRequst(Method Method) (request *Request) {
request = &Request{}
request.Message = c.device.CreateMessage(Method)
request.Message.StartLine.Uri = NewURI(c.DeviceID + "@" + c.device.to.Uri.Domain())
request.Message.To = &Contact{
Uri: request.Message.StartLine.Uri,
}
requestMsg.From = &sip.Contact{
Uri: sip.NewURI(config.Serial + "@" + config.Realm),
request.Message.From = &Contact{
Uri: NewURI(config.Serial + "@" + config.Realm),
Params: map[string]string{"tag": utils.RandNumString(9)},
}
return
@@ -68,7 +73,7 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
channel.recordStartTime, _ = time.Parse(TIME_LAYOUT, startTime)
channel.recordEndTime, _ = time.Parse(TIME_LAYOUT, endTime)
channel.Records = nil
requestMsg := channel.CreateMessage(sip.MESSAGE)
requestMsg := channel.CreateRequst(MESSAGE)
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = fmt.Sprintf(`<?xml version="1.0"?>
<Query>
@@ -78,14 +83,19 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
<StartTime>%s</StartTime>
<EndTime>%s</EndTime>
<Secrecy>0</Secrecy>
<Type>time</Type>
<Type>all</Type>
</Query>`, d.sn, requestMsg.To.Uri.UserInfo(), startTime, endTime)
requestMsg.ContentLength = len(requestMsg.Body)
return d.SendMessage(requestMsg).Code
resp, err := d.SipRequestForResponse(requestMsg)
if err != nil {
return http.StatusRequestTimeout
}
return resp.GetStatusCode()
}
func (channel *Channel) Control(PTZCmd string) int {
d := channel.device
requestMsg := channel.CreateMessage(sip.MESSAGE)
requestMsg := channel.CreateRequst(MESSAGE)
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = fmt.Sprintf(`<?xml version="1.0"?>
<Control>
@@ -95,7 +105,11 @@ func (channel *Channel) Control(PTZCmd string) int {
<PTZCmd>%s</PTZCmd>
</Control>`, d.sn, requestMsg.To.Uri.UserInfo(), PTZCmd)
requestMsg.ContentLength = len(requestMsg.Body)
return d.SendMessage(requestMsg).Code
resp, err := d.SipRequestForResponse(requestMsg)
if err != nil {
return http.StatusRequestTimeout
}
return resp.GetStatusCode()
}
/*
@@ -203,18 +217,22 @@ func (channel *Channel) Invite(start, end string) (code int) {
if config.TCP {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
}
invite := channel.CreateMessage(sip.INVITE)
invite := channel.CreateRequst(INVITE)
invite.ContentType = "application/sdp"
invite.Contact = &sip.Contact{
Uri: sip.NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
invite.Contact = &Contact{
Uri: NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
}
invite.Body = strings.Join(sdpInfo, "\r\n") + "\r\ny=" + string(ssrc) + "\r\n"
invite.ContentLength = len(invite.Body)
invite.Subject = fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, ssrc, config.Serial)
response := d.SendMessage(invite)
fmt.Printf("invite response statuscode: %d\n", response.Code)
if response.Code == 200 {
ds := strings.Split(response.Data.Body, "\r\n")
response, _ := d.Core.SipRequestForResponse(invite)
if response == nil {
return http.StatusRequestTimeout
}
Printf("Channel :%s invite response status code: %d\n", channel.DeviceID, response.GetStatusCode())
if response.GetStatusCode() == 200 {
ds := strings.Split(response.Body, "\r\n")
_SSRC, _ := strconv.ParseInt(string(ssrc), 10, 0)
SSRC := uint32(_SSRC)
for _, l := range ds {
@@ -240,7 +258,7 @@ func (channel *Channel) Invite(start, end string) (code int) {
publisher.OnClose = func() {
publishers.Remove(SSRC)
channel.LivePublisher = nil
channel.ByeBye(channel.inviteRes)
channel.ByeBye((*Request)(channel.inviteRes))
channel.inviteRes = nil
atomic.StoreInt32(&channel.state, 0)
if config.AutoInvite {
@@ -252,7 +270,7 @@ func (channel *Channel) Invite(start, end string) (code int) {
publisher.OnClose = func() {
publishers.Remove(SSRC)
channel.RecordPublisher = nil
channel.ByeBye(channel.recordInviteRes)
channel.ByeBye((*Request)(channel.recordInviteRes))
channel.recordInviteRes = nil
}
}
@@ -261,28 +279,28 @@ func (channel *Channel) Invite(start, end string) (code int) {
}
publishers.Add(SSRC, publisher)
if start == "" {
channel.inviteRes = response.Data
channel.inviteRes = response
channel.LivePublisher = publisher
} else {
channel.RecordPublisher = publisher
channel.recordInviteRes = response.Data
channel.recordInviteRes = response
}
ack := d.CreateMessage(sip.ACK)
ack.StartLine = &sip.StartLine{
Uri: sip.NewURI(channel.DeviceID + "@" + d.to.Uri.Domain()),
Method: sip.ACK,
ack := d.CreateMessage(ACK)
ack.StartLine = &StartLine{
Uri: NewURI(channel.DeviceID + "@" + d.to.Uri.Domain()),
Method: ACK,
}
ack.From = response.Data.From
ack.To = response.Data.To
ack.CallID = response.Data.CallID
ack.From = response.From
ack.To = response.To
ack.CallID = response.CallID
ack.CSeq.ID = invite.CSeq.ID
go d.Send(ack)
d.Respond(&Response{Message: ack})
} else if start == "" && config.AutoInvite {
time.AfterFunc(time.Second*5, func() {
channel.Invite("", "")
})
}
return response.Code
return response.GetStatusCode()
}
func (channel *Channel) Bye(live bool) int {
if live && channel.inviteRes != nil {
@@ -292,7 +310,7 @@ func (channel *Channel) Bye(live bool) int {
channel.LivePublisher.Close()
}
}()
return channel.ByeBye(channel.inviteRes).Code
return channel.ByeBye((*Request)(channel.inviteRes)).GetStatusCode()
}
if !live && channel.recordInviteRes != nil {
defer func() {
@@ -301,21 +319,27 @@ func (channel *Channel) Bye(live bool) int {
channel.RecordPublisher.Close()
}
}()
return channel.ByeBye(channel.recordInviteRes).Code
return channel.ByeBye((*Request)(channel.recordInviteRes)).GetStatusCode()
}
return 404
}
func (c *Channel) ByeBye(res *sip.Message) *transaction.Response {
func (c *Channel) ByeBye(res *Request) *Response {
if res == nil {
return nil
}
bye := c.device.CreateMessage(sip.BYE)
bye.StartLine = &sip.StartLine{
Uri: sip.NewURI(c.DeviceID + "@" + c.device.to.Uri.Domain()),
Method: sip.BYE,
d := c.device
bye := c.device.CreateMessage(BYE)
bye.StartLine = &StartLine{
Uri: NewURI(c.DeviceID + "@" + c.device.to.Uri.Domain()),
Method: BYE,
}
bye.From = res.From
bye.To = res.To
bye.CallID = res.CallID
return c.device.SendMessage(bye)
req := &Request{}
req.Message = bye
resp, _ := d.SipRequestForResponse(req)
return resp
}

267
device.go
View File

@@ -2,6 +2,9 @@ package gb28181
import (
"fmt"
"log"
"net"
"net/http"
"strings"
"sync"
"time"
@@ -10,7 +13,7 @@ import (
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
"github.com/Monibuca/plugin-gb28181/v3/utils"
// . "github.com/Monibuca/utils/v3"
. "github.com/Monibuca/utils/v3"
// . "github.com/logrusorgru/aurora"
)
@@ -33,11 +36,22 @@ func (r *Record) GetPublishStreamPath() string {
return fmt.Sprintf("%s/%s", r.DeviceID, r.StartTime)
}
var (
Devices sync.Map
DeviceNonce sync.Map //保存nonce防止设备伪造
DeviceRegisterCount sync.Map //设备注册次数
)
type Device struct {
*transaction.Core `json:"-"`
ID string
Name string
Manufacturer string
Model string
Owner string
RegisterTime time.Time
UpdateTime time.Time
LastKeepaliveAt time.Time
Status string
Channels []*Channel
sn int
@@ -45,6 +59,7 @@ type Device struct {
to *sip.Contact
Addr string
SipIP string //暴露的IP
SourceAddr net.Addr
channelMap map[string]*Channel
channelMutex sync.RWMutex
subscriber struct {
@@ -84,7 +99,9 @@ func (d *Device) UpdateChannels(list []*Channel) {
path := strings.Split(c.ParentID, "/")
parentId := path[len(path)-1]
if parent, ok := d.channelMap[parentId]; ok {
parent.Children = append(parent.Children, c)
if c.DeviceID != parentId {
parent.Children = append(parent.Children, c)
}
} else {
d.addChannel(c)
}
@@ -93,24 +110,25 @@ func (d *Device) UpdateChannels(list []*Channel) {
}
if old, ok := d.channelMap[c.DeviceID]; ok {
c.ChannelEx = old.ChannelEx
if len(old.Children) == 0 {
if config.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime && n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
if config.AutoInvite && c.LivePublisher == nil {
go c.Invite("", "")
if config.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
if config.AutoInvite &&
(c.LivePublisher == nil || (c.LivePublisher.VideoTracks.Size == 0 && c.LivePublisher.AudioTracks.Size == 0)) {
c.Invite("", "")
}
} else {
c.ChannelEx = &ChannelEx{
device: d,
}
if config.AutoInvite {
go c.Invite("", "")
c.Invite("", "")
}
}
if s := engine.FindStream("sub/" + c.DeviceID); s != nil {
@@ -153,6 +171,29 @@ func (d *Device) CreateMessage(Method sip.Method) (requestMsg *sip.Message) {
}, CallID: utils.RandNumString(10),
Addr: d.Addr,
}
var err2 error
requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
if err2 != nil {
return nil
}
//intranet ip , let's resolve it with public ip
var deviceIp, deviceSourceIP net.IP
switch addr := requestMsg.DestAdd.(type) {
case *net.UDPAddr:
deviceIp = addr.IP
case *net.TCPAddr:
deviceIp = addr.IP
}
switch addr2 := d.SourceAddr.(type) {
case *net.UDPAddr:
deviceSourceIP = addr2.IP
case *net.TCPAddr:
deviceSourceIP = addr2.IP
}
if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
requestMsg.DestAdd = d.SourceAddr
}
return
}
func (d *Device) Subscribe() int {
@@ -164,42 +205,184 @@ func (d *Device) Subscribe() int {
requestMsg.Event = "Catalog"
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(requestMsg.Expires))
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = fmt.Sprintf(`<?xml version="1.0" encoding="gb2312"?>
<Query>
<CmdType>Catalog</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
</Query>`, d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
response := d.SendMessage(requestMsg)
if response.Code == 200 {
d.subscriber.CallID = requestMsg.CallID
} else {
d.subscriber.CallID = ""
requestMsg.Contact = &sip.Contact{
Uri: sip.NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
}
return response.Code
requestMsg.Body = sip.BuildCatalogXML(d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
request := &sip.Request{Message: requestMsg}
response, err := d.Core.SipRequestForResponse(request)
if err == nil && response != nil {
if response.GetStatusCode() == 200 {
d.subscriber.CallID = requestMsg.CallID
} else {
d.subscriber.CallID = ""
}
return response.GetStatusCode()
}
return http.StatusRequestTimeout
}
func (d *Device) Query() {
func (d *Device) Catalog() int {
requestMsg := d.CreateMessage(sip.MESSAGE)
requestMsg.Expires = 3600
requestMsg.Event = "Catalog"
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(requestMsg.Expires))
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = sip.BuildCatalogXML(d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
request := &sip.Request{Message: requestMsg}
response, err := d.Core.SipRequestForResponse(request)
if err == nil && response != nil {
return response.GetStatusCode()
}
return http.StatusRequestTimeout
}
func (d *Device) QueryDeviceInfo(req *sip.Request) {
for i := time.Duration(5); i < 100; i++ {
Printf("device.QueryDeviceInfo:%s ipaddr:%s", d.ID, d.Addr)
time.Sleep(time.Second * i)
requestMsg := d.CreateMessage(sip.MESSAGE)
requestMsg.ContentType = "Application/MANSCDP+xml"
requestMsg.Body = fmt.Sprintf(`<?xml version="1.0" encoding="gb2312"?>
<Query>
<CmdType>Catalog</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
</Query>`, d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.Body = sip.BuildDeviceInfoXML(d.sn, requestMsg.To.Uri.UserInfo())
requestMsg.ContentLength = len(requestMsg.Body)
response := d.SendMessage(requestMsg)
if response.Data != nil && response.Data.Via.Params["received"] != "" {
d.SipIP = response.Data.Via.Params["received"]
}
if response.Code != 200 {
fmt.Printf("device %s send Catalog : %d\n", d.ID, response.Code)
} else {
d.Subscribe()
break
request := &sip.Request{Message: requestMsg}
response, _ := d.Core.SipRequestForResponse(request)
if response != nil {
if response.Via != nil && response.Via.Params["received"] != "" {
d.SipIP = response.Via.Params["received"]
}
if response.GetStatusCode() != 200 {
Printf("device %s send Catalog : %d\n", d.ID, response.GetStatusCode())
} else {
d.Subscribe()
break
}
}
}
}
// MobilePositionSubscribe 移动位置订阅
func (d *Device) MobilePositionSubscribe(id string, expires int, interval int) (code int) {
mobilePosition := d.CreateMessage(sip.SUBSCRIBE)
if d.subscriber.CallID != "" {
mobilePosition.CallID = d.subscriber.CallID
}
mobilePosition.Expires = expires
mobilePosition.Event = "presence"
mobilePosition.Contact = &sip.Contact{
Uri: sip.NewURI(fmt.Sprintf("%s@%s:%d", d.Serial, d.SipIP, d.SipPort)),
}
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(mobilePosition.Expires))
mobilePosition.ContentType = "Application/MANSCDP+xml"
mobilePosition.Body = sip.BuildDevicePositionXML(d.sn, id, interval)
mobilePosition.ContentLength = len(mobilePosition.Body)
msg := &sip.Request{Message: mobilePosition}
response, err := d.Core.SipRequestForResponse(msg)
if err == nil && response != nil {
if response.GetStatusCode() == 200 {
d.subscriber.CallID = mobilePosition.CallID
} else {
d.subscriber.CallID = ""
}
return response.GetStatusCode()
}
return http.StatusRequestTimeout
}
// UpdateChannelPosition 更新通道GPS坐标
func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
if c, ok := d.channelMap[channelId]; ok {
c.ChannelEx.GpsTime, _ = time.ParseInLocation("2006-01-02 15:04:05", gpsTime, time.Local)
c.ChannelEx.Longitude = lng
c.ChannelEx.Latitude = lat
log.Printf("更新通道[%s]坐标成功\n", c.Name)
} else {
log.Printf("更新失败,未找到通道[%s]\n", channelId)
}
}
// UpdateChannelStatus 目录订阅消息处理:新增/移除/更新通道或者更改通道状态
func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
for _, v := range deviceList {
switch v.Event {
case "ON":
log.Println("收到通道上线通知")
d.channelOnline(v.DeviceID)
case "OFF":
log.Println("收到通道离线通知")
d.channelOffline(v.DeviceID)
case "VLOST":
log.Println("收到通道视频丢失通知")
d.channelOffline(v.DeviceID)
case "DEFECT":
log.Println("收到通道故障通知")
d.channelOffline(v.DeviceID)
case "ADD":
log.Println("收到通道新增通知")
channel := Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
Manufacturer: v.Manufacturer,
Model: v.Model,
Owner: v.Owner,
CivilCode: v.CivilCode,
Address: v.Address,
Parental: v.Parental,
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addChannel(&channel)
case "DEL":
//删除
log.Println("收到通道删除通知")
delete(d.channelMap, v.DeviceID)
case "UPDATE":
fmt.Println("收到通道更新通知")
// 更新通道
channel := &Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
Manufacturer: v.Manufacturer,
Model: v.Model,
Owner: v.Owner,
CivilCode: v.CivilCode,
Address: v.Address,
Parental: v.Parental,
SafetyWay: v.SafetyWay,
RegisterWay: v.RegisterWay,
Secrecy: v.Secrecy,
Status: v.Status,
}
channels := []*Channel{channel}
d.UpdateChannels(channels)
}
}
}
func (d *Device) channelOnline(DeviceID string) {
if c, ok := d.channelMap[DeviceID]; ok {
c.Status = "ON"
log.Printf("通道[%s]在线\n", c.Name)
} else {
log.Printf("更新通道[%s]状态失败,未找到\n", DeviceID)
}
}
func (d *Device) channelOffline(DeviceID string) {
if c, ok := d.channelMap[DeviceID]; ok {
c.Status = "OFF"
log.Printf("通道[%s]离线\n", c.Name)
} else {
log.Printf("更新通道[%s]状态失败,未找到\n", DeviceID)
}
}

3
go.mod
View File

@@ -3,9 +3,10 @@ module github.com/Monibuca/plugin-gb28181/v3
go 1.13
require (
github.com/Monibuca/engine/v3 v3.4.5
github.com/Monibuca/engine/v3 v3.5.0
github.com/Monibuca/utils/v3 v3.0.5
github.com/agiledragon/gomonkey/v2 v2.2.0
github.com/google/uuid v1.3.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/pion/rtp v1.7.4
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2

63
go.sum
View File

@@ -1,12 +1,16 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/Monibuca/engine/v3 v3.4.5 h1:gPRsliBVC70EhKK7sJz/3LoNO7lfnXNC8uUaOIHZkUE=
github.com/Monibuca/engine/v3 v3.4.5/go.mod h1:Dik9pFxU9TFI5vj8Sv5QXZM+ooCs2fm9P7Uhe4yYNkQ=
github.com/Monibuca/engine/v3 v3.5.0 h1:hkuOdEXlnjcUhgDqOBPcrJ6fgv+HuWuMSpYhqvT3pfc=
github.com/Monibuca/engine/v3 v3.5.0/go.mod h1:yNiVKeHxgv+Ez+f2RHXMkXoa5Oxv+G7Ch+MJdHi7ing=
github.com/Monibuca/utils/v3 v3.0.5 h1:w14x0HkWTbF4MmHbINLlOwe4VJNoSOeaQChMk5E/4es=
github.com/Monibuca/utils/v3 v3.0.5/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE=
github.com/agiledragon/gomonkey/v2 v2.2.0 h1:QJWqpdEhGV/JJy70sZ/LDnhbSlMrqHAWHcNOjz1kyuI=
github.com/agiledragon/gomonkey/v2 v2.2.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY=
github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs=
github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4=
@@ -15,8 +19,9 @@ github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusO
github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg=
github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo=
github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
@@ -33,6 +38,11 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
@@ -48,6 +58,7 @@ github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko
github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA=
github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -58,41 +69,77 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

219
handle.go Normal file
View File

@@ -0,0 +1,219 @@
package gb28181
import (
"bytes"
"encoding/xml"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
"github.com/Monibuca/plugin-gb28181/v3/utils"
"github.com/logrusorgru/aurora"
"net/http"
"time"
. "github.com/Monibuca/utils/v3"
"golang.org/x/net/html/charset"
)
func OnRegister(req *sip.Request, tx *transaction.GBTx) {
id := req.From.Uri.UserInfo()
passAuth := false
// 不需要密码情况
if config.Username == "" && config.Password == "" {
passAuth = true
} else {
// 需要密码情况 设备第一次上报返回401和加密算法
if req.Authorization != nil && req.Authorization.GetUsername() != "" {
// 有些摄像头没有配置用户名的地方用户名就是摄像头自己的国标id
var username string
if req.Authorization.GetUsername() == id {
username = id
} else {
username = config.Username
}
if dc, ok := DeviceRegisterCount.LoadOrStore(id, 1); ok && dc.(int) > MaxRegisterCount {
var response sip.Response
response.Message = req.BuildResponse(http.StatusForbidden)
_ = tx.Respond(&response)
return
} else {
// 设备第二次上报,校验
_nonce, loaded := DeviceNonce.Load(id)
if loaded && req.Authorization.Verify(username, config.Password, config.Realm, _nonce.(string)) {
passAuth = true
} else {
DeviceRegisterCount.Store(id, dc.(int)+1)
}
}
}
}
if passAuth {
storeDevice(id, tx.Core, req.Message)
DeviceNonce.Delete(id)
DeviceRegisterCount.Delete(id)
m := req.BuildOK()
resp := &sip.Response{Message: m}
_ = tx.Respond(resp)
} else {
var response sip.Response
response.Message = req.BuildResponseWithPhrase(401, "Unauthorized")
_nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
response.WwwAuthenticate = sip.NewWwwAuthenticate(config.Realm, _nonce.(string), sip.DIGEST_ALGO_MD5)
response.SourceAdd = req.DestAdd
response.DestAdd = req.SourceAdd
_ = tx.Respond(&response)
}
}
func OnMessage(req *sip.Request, tx *transaction.GBTx) {
if v, ok := Devices.Load(req.From.Uri.UserInfo()); ok {
d := v.(*Device)
d.SourceAddr = req.SourceAdd
if d.Status == string(sip.REGISTER) {
d.Status = "ONLINE"
go d.QueryDeviceInfo(req)
}
d.UpdateTime = time.Now()
temp := &struct {
XMLName xml.Name
CmdType string
DeviceID string
DeviceName string
Manufacturer string
Model string
Channel string
DeviceList []*Channel `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body)))
decoder.CharsetReader = charset.NewReaderLabel
err := decoder.Decode(temp)
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body))
if err != nil {
Printf("decode catelog err: %s", err)
}
}
var body string
switch temp.CmdType {
case "Keepalive":
d.LastKeepaliveAt = time.Now()
//callID !="" 说明是订阅的事件类型信息
if d.Channels == nil {
go d.Catalog()
} else {
if d.subscriber.CallID != "" && d.LastKeepaliveAt.After(d.subscriber.Timeout) {
go d.Catalog()
} else {
for _, c := range d.Channels {
if config.AutoInvite &&
(c.LivePublisher == nil || (c.LivePublisher.VideoTracks.Size == 0 && c.LivePublisher.AudioTracks.Size == 0)) {
c.Invite("", "")
}
}
}
}
d.CheckSubStream()
case "Catalog":
d.UpdateChannels(temp.DeviceList)
case "RecordInfo":
d.UpdateRecord(temp.DeviceID, temp.RecordList)
case "DeviceInfo":
// 主设备信息
d.Name = temp.DeviceName
d.Manufacturer = temp.Manufacturer
d.Model = temp.Model
case "Alarm":
d.Status = "Alarmed"
body = sip.BuildAlarmResponseXML(d.ID)
default:
Println("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:\n", req.Body)
response := &sip.Response{req.BuildResponse(http.StatusBadRequest)}
tx.Respond(response)
return
}
buildOK := req.BuildOK()
buildOK.Body = body
response := &sip.Response{buildOK}
tx.Respond(response)
}
}
func onBye(req *sip.Request, tx *transaction.GBTx) {
response := &sip.Response{req.BuildOK()}
_ = tx.Respond(response)
}
// OnNotify 订阅通知处理
func OnNotify(req *sip.Request, tx *transaction.GBTx) {
if v, ok := Devices.Load(req.From.Uri.UserInfo()); ok {
d := v.(*Device)
d.UpdateTime = time.Now()
temp := &struct {
XMLName xml.Name
CmdType string
DeviceID string
Time string //位置订阅-GPS时间
Longitude string //位置订阅-经度
Latitude string //位置订阅-维度
// Speed string //位置订阅-速度(km/h)(可选)
// Direction string //位置订阅-方向(取值为当前摄像头方向与正北方的顺时针夹角,取值范围0°~360°,单位:°)(可选)
// Altitude string //位置订阅-海拔高度,单位:m(可选)
DeviceList []*notifyMessage `xml:"DeviceList>Item"` //目录订阅
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body)))
decoder.CharsetReader = charset.NewReaderLabel
err := decoder.Decode(temp)
if err != nil {
err = utils.DecodeGbk(temp, []byte(req.Body))
if err != nil {
Printf("decode catelog err: %s", err)
}
}
var body string
switch temp.CmdType {
case "Catalog":
//目录状态
d.UpdateChannelStatus(temp.DeviceList)
case "MobilePosition":
//更新channel的坐标
d.UpdateChannelPosition(temp.DeviceID, temp.Time, temp.Longitude, temp.Latitude)
// case "Alarm":
// //报警事件通知 TODO
default:
Println("DeviceID:", aurora.Red(d.ID), " Not supported CmdType : "+temp.CmdType+" body:\n", req.Body)
response := &sip.Response{req.BuildResponse(http.StatusBadRequest)}
tx.Respond(response)
return
}
buildOK := req.BuildOK()
buildOK.Body = body
response := &sip.Response{buildOK}
tx.Respond(response)
}
}
type notifyMessage struct {
DeviceID string
ParentID string
Name string
Manufacturer string
Model string
Owner string
CivilCode string
Address string
Parental int
SafetyWay int
RegisterWay int
Secrecy int
Status string
//状态改变事件 ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)
Event string
}

242
main.go
View File

@@ -2,8 +2,6 @@ package gb28181
import (
"bufio"
"bytes"
"encoding/xml"
"io"
"log"
"net"
@@ -15,19 +13,15 @@ import (
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
"github.com/Monibuca/plugin-gb28181/v3/utils"
. "github.com/Monibuca/utils/v3"
. "github.com/logrusorgru/aurora"
"github.com/pion/rtp"
"golang.org/x/net/html/charset"
)
var (
Devices sync.Map
DeviceNonce = make(map[string]string) //保存nonce防止设备伪造
DeviceRegisterCount = make(map[string]int) //设备注册次数
Ignores = make(map[string]struct{})
publishers Publishers
Ignores = make(map[string]struct{})
publishers Publishers
serverConfig *transaction.Config
)
const MaxRegisterCount = 3
@@ -79,7 +73,8 @@ var config = struct {
Username string
Password string
UdpCacheSize int //udp排序缓存
}{"34020000002000000001", "3402000000", "127.0.0.1:5060", 3600, 58200, false, -1, nil, false, 1, 600, false, "", "", 0}
LogVerbose bool
}{"34020000002000000001", "3402000000", "127.0.0.1:5060", 3600, 58200, false, -1, nil, false, 1, 600, false, "", "", 0, false}
func init() {
pc := engine.PluginConfig{
@@ -90,6 +85,44 @@ func init() {
publishers.data = make(map[uint32]*Publisher)
}
func storeDevice(id string, s *transaction.Core, req *sip.Message) {
var d *Device
if _d, loaded := Devices.Load(id); loaded {
d = _d.(*Device)
d.UpdateTime = time.Now()
d.from = &sip.Contact{Uri: req.StartLine.Uri, Params: make(map[string]string)}
d.to = req.To
d.Addr = req.Via.GetSendBy()
//TODO: Should we send GetDeviceInf request?
//message := d.CreateMessage(sip.MESSAGE)
//message.Body = sip.GetDeviceInfoXML(d.ID)
//request := &sip.Request{Message: message}
//if newTx, err := s.Request(request); err == nil {
// if _, err = newTx.SipResponse(); err != nil {
// Println("notify device after register,", err)
// return
// }
//}
} else {
Devices.Store(id, &Device{
ID: id,
RegisterTime: time.Now(),
UpdateTime: time.Now(),
Status: string(sip.REGISTER),
Core: s,
from: &sip.Contact{Uri: req.StartLine.Uri, Params: make(map[string]string)},
to: req.To,
Addr: req.Via.GetSendBy(),
SipIP: serverConfig.MediaIP,
channelMap: make(map[string]*Channel),
})
}
}
func run() {
ipAddr, err := net.ResolveUDPAddr("", config.ListenAddr)
if err != nil {
@@ -100,7 +133,7 @@ func run() {
Ignores[id] = struct{}{}
}
useTCP := config.TCP
config := &transaction.Config{
serverConfig = &transaction.Config{
SipIP: ipAddr.IP.String(),
SipPort: uint16(ipAddr.Port),
SipNetwork: "UDP",
@@ -119,7 +152,35 @@ func run() {
MediaIdleTimeout: 30,
RemoveBanInterval: config.RemoveBanInterval,
UdpCacheSize: config.UdpCacheSize,
LogVerbose: config.LogVerbose,
}
s := transaction.NewCore(serverConfig)
s.RegistHandler(sip.REGISTER, OnRegister)
s.RegistHandler(sip.MESSAGE, OnMessage)
s.RegistHandler(sip.NOTIFY, OnNotify)
s.RegistHandler(sip.BYE, onBye)
//OnStreamClosedHooks.AddHook(func(stream *Stream) {
// Devices.Range(func(key, value interface{}) bool {
// device:=value.(*Device)
// for _,channel := range device.Channels {
// if stream.StreamPath == channel.RecordSP {
//
// }
// }
// })
//})
if useTCP {
listenMediaTCP()
} else {
go listenMediaUDP()
}
// go queryCatalog(serverConfig)
if serverConfig.Username != "" || serverConfig.Password != "" {
go removeBanDevice(serverConfig)
}
http.HandleFunc("/api/gb28181/query/records", func(w http.ResponseWriter, r *http.Request) {
CORS(w, r)
id := r.URL.Query().Get("id")
@@ -139,7 +200,7 @@ func run() {
var list []*Device
Devices.Range(func(key, value interface{}) bool {
device := value.(*Device)
if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
if time.Since(device.UpdateTime) > time.Duration(serverConfig.RegisterValidity)*time.Second {
Devices.Delete(key)
} else {
list = append(list, device)
@@ -193,138 +254,28 @@ func run() {
w.WriteHeader(404)
}
})
s := transaction.NewCore(config)
s.OnRegister = func(msg *sip.Message) {
id := msg.From.Uri.UserInfo()
storeDevice := func() {
var d *Device
http.HandleFunc("/api/gb28181/position", func(w http.ResponseWriter, r *http.Request) {
CORS(w, r)
query := r.URL.Query()
//设备id
id := query.Get("id")
//订阅周期(单位:秒)
expires := query.Get("expires")
//订阅间隔(单位:秒)
interval := query.Get("interval")
if _d, loaded := Devices.LoadOrStore(id, &Device{
ID: id,
RegisterTime: time.Now(),
UpdateTime: time.Now(),
Status: string(sip.REGISTER),
Core: s,
from: &sip.Contact{Uri: msg.StartLine.Uri, Params: make(map[string]string)},
to: msg.To,
Addr: msg.Via.GetSendBy(),
SipIP: config.MediaIP,
channelMap: make(map[string]*Channel),
}); loaded {
d = _d.(*Device)
d.UpdateTime = time.Now()
d.from = &sip.Contact{Uri: msg.StartLine.Uri, Params: make(map[string]string)}
d.to = msg.To
d.Addr = msg.Via.GetSendBy()
}
}
// 不需要密码情况
if config.Username == "" && config.Password == "" {
storeDevice()
return
}
sendUnauthorized := func() {
response := msg.BuildResponseWithPhrase(401, "Unauthorized")
if DeviceNonce[id] == "" {
nonce := utils.RandNumString(32)
DeviceNonce[id] = nonce
}
response.WwwAuthenticate = sip.NewWwwAuthenticate(s.Realm, DeviceNonce[id], sip.DIGEST_ALGO_MD5)
s.Send(response)
}
// 需要密码情况 设备第一次上报返回401和加密算法
if msg.Authorization == nil || msg.Authorization.GetUsername() == "" {
sendUnauthorized()
return
}
// 有些摄像头没有配置用户名的地方用户名就是摄像头自己的国标id
username := config.Username
if msg.Authorization.GetUsername() == id {
username = id
}
expiresInt, _ := strconv.Atoi(expires)
intervalInt, _ := strconv.Atoi(interval)
if DeviceRegisterCount[id] >= MaxRegisterCount {
s.Send(msg.BuildResponse(403))
return
}
// 设备第二次上报,校验
if !msg.Authorization.Verify(username, config.Password, config.Realm, DeviceNonce[id]) {
sendUnauthorized()
DeviceRegisterCount[id] += 1
return
}
storeDevice()
delete(DeviceNonce, id)
delete(DeviceRegisterCount, id)
}
s.OnMessage = func(msg *sip.Message) bool {
if v, ok := Devices.Load(msg.From.Uri.UserInfo()); ok {
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
if d.Status == string(sip.REGISTER) {
d.Status = "ONLINE"
go d.Query()
}
d.UpdateTime = time.Now()
temp := &struct {
XMLName xml.Name
CmdType string
DeviceID string
DeviceList []*Channel `xml:"DeviceList>Item"`
RecordList []*Record `xml:"RecordList>Item"`
}{}
decoder := xml.NewDecoder(bytes.NewReader([]byte(msg.Body)))
decoder.CharsetReader = charset.NewReaderLabel
err := decoder.Decode(temp)
if err != nil {
err = utils.DecodeGbk(temp, []byte(msg.Body))
if err != nil {
log.Printf("decode catelog err: %s", err)
}
}
switch temp.XMLName.Local {
case "Notify":
switch temp.CmdType {
case "Keeyalive":
if d.subscriber.CallID != "" && time.Now().After(d.subscriber.Timeout) {
go d.Subscribe()
}
d.CheckSubStream()
case "Catalog":
d.UpdateChannels(temp.DeviceList)
}
case "Response":
switch temp.CmdType {
case "Catalog":
d.UpdateChannels(temp.DeviceList)
case "RecordInfo":
d.UpdateRecord(temp.DeviceID, temp.RecordList)
}
}
return true
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
} else {
w.WriteHeader(404)
}
return false
}
//OnStreamClosedHooks.AddHook(func(stream *Stream) {
// Devices.Range(func(key, value interface{}) bool {
// device:=value.(*Device)
// for _,channel := range device.Channels {
// if stream.StreamPath == channel.RecordSP {
//
// }
// }
// })
//})
if useTCP {
listenMediaTCP()
} else {
go listenMediaUDP()
}
// go queryCatalog(config)
if config.Username != "" || config.Password != "" {
go removeBanDevice(config)
}
s.Start()
})
s.StartAndWait()
}
func listenMediaTCP() {
for i := uint16(0); i < config.TCPMediaPortNum; i++ {
@@ -383,7 +334,7 @@ func listenMediaUDP() {
// if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
// Devices.Delete(key)
// } else if device.Channels != nil {
// go device.Subscribe()
// go device.Catalog()
// }
// return true
// })
@@ -393,10 +344,11 @@ func listenMediaUDP() {
func removeBanDevice(config *transaction.Config) {
t := time.NewTicker(time.Duration(config.RemoveBanInterval) * time.Second)
for range t.C {
for id, cnt := range DeviceRegisterCount {
if cnt >= MaxRegisterCount {
delete(DeviceRegisterCount, id)
DeviceRegisterCount.Range(func(key, value interface{}) bool {
if value.(int) > MaxRegisterCount {
DeviceRegisterCount.Delete(key)
}
}
return true
})
}
}

View File

@@ -9,7 +9,7 @@ import (
type Publisher struct {
*engine.Stream
parser utils.DecPSPackage
parser *utils.DecPSPackage
pushVideo func(uint32, uint32, []byte)
pushAudio func(uint32, []byte)
lastSeq uint16
@@ -90,9 +90,12 @@ func (p *Publisher) PushPS(rtp *rtp.Packet) {
}
p.lastSeq = rtp.SequenceNumber
p.Update()
if p.parser == nil {
p.parser = new(utils.DecPSPackage)
}
if len(ps) >= 4 && BigEndian.Uint32(ps) == utils.StartCodePS {
if p.parser.Len() > 0 {
p.parser.Uint32()
p.parser.Skip(4)
p.parser.Read(rtp.Timestamp, p)
p.parser.Reset()
}

View File

@@ -3,6 +3,7 @@ package sip
import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"time"
@@ -39,8 +40,10 @@ type Message struct {
Body string
Addr string
Event string
Date time.Time
Date time.Time
WwwAuthenticate *WwwAuthenticate //gb28181 密码验证 上级发给下级是WwwAuthenticate下级发给上级是Authorization
SourceAdd net.Addr
DestAdd net.Addr
}
func (m *Message) BuildResponse(code int) *Message {
@@ -58,18 +61,19 @@ func (m *Message) BuildResponseWithPhrase(code int, phrase string) *Message {
CSeq: m.CSeq,
Via: m.Via,
MaxForwards: m.MaxForwards,
UserAgent: "Monibuca",
UserAgent: "Monibuca",
StartLine: &StartLine{
Code: code,
phrase: phrase,
},
Date: time.Now(),
Date: time.Now(),
DestAdd: m.SourceAdd,
}
return &response
}
//z9hG4bK + 10个随机数字
func randBranch() string {
func RandBranch() string {
return fmt.Sprintf("z9hG4bK%s", utils.RandNumString(8))
}
@@ -93,7 +97,7 @@ func BuildMessageRequest(method Method, transport, sipSerial, sipRealm, username
Host: client,
}
msg.Via.Params = map[string]string{
"branch": randBranch(),
"branch": RandBranch(),
"rport": "-1", //only key,no-value
}
msg.From = &Contact{
@@ -136,8 +140,6 @@ func (m *Message) IsResponse() bool {
func (m *Message) GetMethod() Method {
if m.CSeq == nil {
b, _ := Encode(m)
println(string(b))
return MESSAGE
}
return m.CSeq.Method
@@ -179,12 +181,12 @@ func (m *Message) GetBranch() string {
//请求消息的source格式 host:port
func (m *Message) Source() string {
if m.Mode == SIP_MESSAGE_RESPONSE {
fmt.Println("only for request message")
//fmt.Println("only for request message")
return ""
}
if m.Via == nil {
fmt.Println("invalid request message")
//fmt.Println("invalid request message")
return ""
}
@@ -282,7 +284,7 @@ func Decode(data []byte) (msg *Message, err error) {
return
}
} else {
fmt.Println(firstline)
//fmt.Println(firstline)
}
}
continue
@@ -462,7 +464,7 @@ func Encode(msg *Message) ([]byte, error) {
sb.WriteString("Date: ")
sb.WriteString(msg.Date.Format("2006-01-02T15:04:05.999"))
sb.WriteString(CRLF)
}
}
if msg.Event != "" {
sb.WriteString("Event: ")
sb.WriteString(msg.Event)
@@ -485,7 +487,7 @@ func Encode(msg *Message) ([]byte, error) {
}
sb.WriteString("Content-Length: ")
sb.WriteString(strconv.Itoa(msg.ContentLength))
sb.WriteString(strconv.Itoa(len(msg.Body)))
sb.WriteString(CRLFCRLF)

69
sip/request.go Normal file
View File

@@ -0,0 +1,69 @@
package sip
import (
"fmt"
"time"
)
// Request Request
type Request struct {
*Message
}
var (
// CatalogXML 获取设备列表xml样式
CatalogXML = `<?xml version="1.0"?><Query>
<CmdType>Catalog</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
</Query>
`
// RecordInfoXML 获取录像文件列表xml样式
RecordInfoXML = `<?xml version="1.0"?>
<Query>
<CmdType>RecordInfo</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<StartTime>%s</StartTime>
<EndTime>%s</EndTime>
<Secrecy>0</Secrecy>
<Type>all</Type>
</Query>
`
// DeviceInfoXML 查询设备详情xml样式
DeviceInfoXML = `<?xml version="1.0"?>
<Query>
<CmdType>DeviceInfo</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
</Query>
`
// DevicePositionXML 订阅设备位置
DevicePositionXML = `<?xml version="1.0"?>
<Query>
<CmdType>MobilePosition</CmdType>
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<Interval>%d</Interval>
</Query>`
)
// BuildDeviceInfoXML 获取设备详情指令
func BuildDeviceInfoXML(sn int, id string) string {
return fmt.Sprintf(DeviceInfoXML, sn, id)
}
// BuildCatalogXML 获取NVR下设备列表指令
func BuildCatalogXML(sn int, id string) string {
return fmt.Sprintf(CatalogXML, sn, id)
}
// BuildRecordInfoXML 获取录像文件列表指令
func BuildRecordInfoXML(sn int, id string, start, end int64) string {
return fmt.Sprintf(RecordInfoXML, sn, id, time.Unix(start, 0).Format("2006-01-02T15:04:05"), time.Unix(end, 0).Format("2006-01-02T15:04:05"))
}
// BuildDevicePositionXML 订阅设备位置
func BuildDevicePositionXML(sn int, id string, interval int) string {
return fmt.Sprintf(DevicePositionXML, sn, id, interval)
}

25
sip/response.go Normal file
View File

@@ -0,0 +1,25 @@
package sip
import (
"fmt"
)
// Response Response
type Response struct {
*Message
}
// AlarmResponseXML alarm response xml样式
var (AlarmResponseXML = `<?xml version="1.0"?>
<Response>
<CmdType>Alarm</CmdType>
<SN>17430</SN>
<DeviceID>%s</DeviceID>
</Response>
`
)
// BuildRecordInfoXML 获取录像文件列表指令
func BuildAlarmResponseXML(id string) string {
return fmt.Sprintf(AlarmResponseXML, id)
}

View File

@@ -1,4 +1,4 @@
#### 事
#### 事
transaction事务是SIP的基本组成部分。
一个事务是客户发送的一个请求事务(通过通讯层)发送到一个服务器事务,连同服务器事务的所有的该请求的应答发送回客户端事务。
事务层处理应用服务层的重发,匹配请求的应答,以及应用服务层的超时。
@@ -6,12 +6,12 @@ transaction事务是SIP的基本组成部分。
用户代理包含一个事务层,来实现有状态的代理服务器。
事务层包含一个客户元素(可以认为是一个客户事务)和一个服务器元素(可以认为是一个服务器事务),他们都可以用一个有限状态机来处理特定的请求。
在状态机层面,事分为ict、ist、nict、nist四种。
但底层事方面,仅根据 method 等信息,分支处理。
在状态机层面,事分为ict、ist、nict、nist四种。
但底层事方面,仅根据 method 等信息,分支处理。
#### 关于事管理
#### 关于事管理
在map里面管理ID的选择是要和事匹配相关。
在map里面管理ID的选择是要和事匹配相关。
```
客户端事件的匹配
@@ -21,7 +21,7 @@ transaction事务是SIP的基本组成部分。
2.如果CSeq标头字段中的method参数与创建事务的请求的方法匹配。由于CANCEL请求会创建新的事务但共享相同的branch参数值。所以仅用branch参数是不够的
服务端事匹配
服务端事匹配
首先要检查请求中的Via头域的branch参数。如果他以”z9hG4bk”开头那么这个请求一定是由客户端事务根据本规范产生的。因此branch参数在该客户端发出的所有的事务中都是唯一的。根据下列规则我们可以判定请求是否和事务匹配
@@ -30,7 +30,8 @@ transaction事务是SIP的基本组成部分。
3、 请求的方法与创建事务的方法匹配但ACK除外在ACK中创建事务的请求的方法为INVITE。
```
所以,根据匹配规则,事的ID 使用 branch然后在匹配逻辑里面再做条件判断。而因为branch可能会重复所以如果使用map来简化transaction的管理key的取值应该
客户端事物: branch+method
服务端事物: branch + sendby + method,method中ack还要除外。所以只能用branch + sendby
所以,根据匹配规则,事的ID 使用 branch然后在匹配逻辑里面再做条件判断。而因为branch可能会重复所以如果使用map来简化transaction的管理key的取值应该
|type| key|
|----|----|
|客户端事务:| branch+method|
|服务端事务:| branch + sendby + method,method中ack还要除外。所以只能用branch + sendby|

View File

@@ -58,7 +58,8 @@ type Config struct {
MediaPortMax uint16
MediaIdleTimeout uint16 //推流超时时间,超过则断开链接,让设备重连
AudioEnable bool //是否开启音频
WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
RemoveBanInterval int //移除禁止设备间隔
UdpCacheSize int //udp缓存大小
LogVerbose bool
WaitKeyFrame bool //是否等待关键帧,如果等待,则在收到第一个关键帧之前,忽略所有媒体流
RemoveBanInterval int //移除禁止设备间隔
UdpCacheSize int //udp缓存大小
}

View File

@@ -3,25 +3,29 @@ package transaction
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/Monibuca/plugin-gb28181/v3/sip"
. "github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transport"
. "github.com/Monibuca/utils/v3"
"net"
"net/http"
"strconv"
"sync"
)
type RequestHandler func(req *Request, tx *GBTx)
//Core: transactions manager
//管理所有 transactions以及相关全局参数、运行状态机
type Core struct {
ctx context.Context //上下文
handlers map[State]map[Event]Handler //每个状态都可以处理有限个事件。不必加锁。
transactions map[string]*Transaction //管理所有 transactions,key:tid,value:transaction
mutex sync.RWMutex //transactions的锁
tp transport.ITransport //transport
*Config //sip server配置信息
OnRegister func(*sip.Message)
OnMessage func(*sip.Message) bool
ctx context.Context //上下文
hmu *sync.RWMutex
requestHandlers map[Method]RequestHandler
txs *GBTxs
tp transport.ITransport //transport
*Config //sip server配置信息
OnRegister func(msg *Request, tx *GBTx)
OnMessage func(msg *Request, tx *GBTx)
udpaddr net.Addr
}
//初始化一个 Core需要能响应请求也要能发起请求
@@ -31,163 +35,43 @@ type Core struct {
//TODO:通过context管理子线程
//TODO:单元测试
func NewCore(config *Config) *Core {
core := &Core{
handlers: make(map[State]map[Event]Handler),
transactions: make(map[string]*Transaction),
Config: config,
ctx: context.Background(),
ActiveTX = &GBTxs{
Txs: map[string]*GBTx{},
RWM: &sync.RWMutex{},
}
core := &Core{
requestHandlers: map[Method]RequestHandler{},
txs: ActiveTX,
Config: config,
ctx: context.Background(),
hmu: &sync.RWMutex{},
}
if config.SipNetwork == "TCP" {
core.tp = transport.NewTCPServer(config.SipPort, true)
} else {
core.tp = transport.NewUDPServer(config.SipPort)
}
//填充fsm
core.addICTHandler()
core.addISTHandler()
core.addNICTHandler()
core.addNISTHandler()
return core
}
//add transaction to core
func (c *Core) AddTransaction(ta *Transaction) {
c.mutex.Lock()
c.transactions[ta.id] = ta
c.mutex.Unlock()
func (c *Core) StartAndWait() {
go c.handlerListen()
_ = c.tp.StartAndWait()
}
//delete transaction
func (c *Core) DelTransaction(tid string) {
c.mutex.Lock()
delete(c.transactions, tid)
c.mutex.Unlock()
}
//创建事物
//填充此事物的参数via、from、to、callID、cseq
func (c *Core) initTransaction(ctx context.Context, tid string, m *sip.Message) *Transaction {
//ack要么属于一个invite事物要么由TU层直接管理不通过事物管理。
if m.GetMethod() == sip.ACK {
fmt.Println("ack nerver create transaction")
return nil
}
ta := &Transaction{
id: tid,
core: c,
response: make(chan *Response),
startAt: time.Now(),
endAt: time.Now().Add(1000000 * time.Hour),
}
ta.Context, ta.cancel = context.WithTimeout(ctx, time.Second*5)
//填充其他transaction的信息
ta.via = m.Via
ta.from = m.From
ta.to = m.To
ta.callID = m.CallID
ta.cseq = m.CSeq
ta.origRequest = m
return ta
}
//状态机初始化:ICT
func (c *Core) addICTHandler() {
c.addHandler(ICT_PRE_CALLING, SND_REQINVITE, ict_snd_invite)
c.addHandler(ICT_CALLING, TIMEOUT_A, osip_ict_timeout_a_event)
c.addHandler(ICT_CALLING, TIMEOUT_B, osip_ict_timeout_b_event)
c.addHandler(ICT_CALLING, RCV_STATUS_1XX, ict_rcv_1xx)
c.addHandler(ICT_CALLING, RCV_STATUS_2XX, ict_rcv_2xx)
c.addHandler(ICT_CALLING, RCV_STATUS_3456XX, ict_rcv_3456xx)
c.addHandler(ICT_PROCEEDING, RCV_STATUS_1XX, ict_rcv_1xx)
c.addHandler(ICT_PROCEEDING, RCV_STATUS_2XX, ict_rcv_2xx)
c.addHandler(ICT_PROCEEDING, RCV_STATUS_3456XX, ict_rcv_3456xx)
c.addHandler(ICT_COMPLETED, RCV_STATUS_3456XX, ict_retransmit_ack)
c.addHandler(ICT_COMPLETED, TIMEOUT_D, osip_ict_timeout_d_event)
}
//状态机初始化:IST
func (c *Core) addISTHandler() {
c.addHandler(IST_PRE_PROCEEDING, RCV_REQINVITE, ist_rcv_invite)
c.addHandler(IST_PROCEEDING, RCV_REQINVITE, ist_rcv_invite)
c.addHandler(IST_COMPLETED, RCV_REQINVITE, ist_rcv_invite)
c.addHandler(IST_COMPLETED, TIMEOUT_G, osip_ist_timeout_g_event)
c.addHandler(IST_COMPLETED, TIMEOUT_H, osip_ist_timeout_h_event)
c.addHandler(IST_PROCEEDING, SND_STATUS_1XX, ist_snd_1xx)
c.addHandler(IST_PROCEEDING, SND_STATUS_2XX, ist_snd_2xx)
c.addHandler(IST_PROCEEDING, SND_STATUS_3456XX, ist_snd_3456xx)
c.addHandler(IST_COMPLETED, RCV_REQACK, ist_rcv_ack)
c.addHandler(IST_CONFIRMED, RCV_REQACK, ist_rcv_ack)
c.addHandler(IST_CONFIRMED, TIMEOUT_I, osip_ist_timeout_i_event)
}
//状态机初始化:NICT
func (c *Core) addNICTHandler() {
c.addHandler(NICT_PRE_TRYING, SND_REQUEST, nict_snd_request)
c.addHandler(NICT_TRYING, TIMEOUT_F, osip_nict_timeout_f_event)
c.addHandler(NICT_TRYING, TIMEOUT_E, osip_nict_timeout_e_event)
c.addHandler(NICT_TRYING, RCV_STATUS_1XX, nict_rcv_1xx)
c.addHandler(NICT_TRYING, RCV_STATUS_2XX, nict_rcv_23456xx)
c.addHandler(NICT_TRYING, RCV_STATUS_3456XX, nict_rcv_23456xx)
c.addHandler(NICT_PROCEEDING, TIMEOUT_F, osip_nict_timeout_f_event)
c.addHandler(NICT_PROCEEDING, TIMEOUT_E, osip_nict_timeout_e_event)
c.addHandler(NICT_PROCEEDING, RCV_STATUS_1XX, nict_rcv_1xx)
c.addHandler(NICT_PROCEEDING, RCV_STATUS_2XX, nict_rcv_23456xx)
c.addHandler(NICT_PROCEEDING, RCV_STATUS_3456XX, nict_rcv_23456xx)
c.addHandler(NICT_COMPLETED, TIMEOUT_K, osip_nict_timeout_k_event)
}
//状态机初始化:NIST
func (c *Core) addNISTHandler() {
c.addHandler(NIST_PRE_TRYING, RCV_REQUEST, nist_rcv_request)
c.addHandler(NIST_TRYING, SND_STATUS_1XX, nist_snd_1xx)
c.addHandler(NIST_TRYING, SND_STATUS_2XX, nist_snd_23456xx)
c.addHandler(NIST_TRYING, SND_STATUS_3456XX, nist_snd_23456xx)
c.addHandler(NIST_PROCEEDING, SND_STATUS_1XX, nist_snd_1xx)
c.addHandler(NIST_PROCEEDING, SND_STATUS_2XX, nist_snd_23456xx)
c.addHandler(NIST_PROCEEDING, SND_STATUS_3456XX, nist_snd_23456xx)
c.addHandler(NIST_PROCEEDING, RCV_REQUEST, nist_rcv_request)
c.addHandler(NIST_COMPLETED, TIMEOUT_J, osip_nist_timeout_j_event)
c.addHandler(NIST_COMPLETED, RCV_REQUEST, nist_rcv_request)
}
//状态机初始化根据state 匹配到对应的状态机
func (c *Core) addHandler(state State, event Event, handler Handler) {
m := c.handlers
if state >= DIALOG_CLOSE {
fmt.Println("invalid state:", state)
return
}
if event >= UNKNOWN_EVT {
fmt.Println("invalid event:", event)
return
}
if _, ok := m[state]; !ok {
m[state] = make(map[Event]Handler)
}
if _, ok := m[state][event]; ok {
fmt.Printf("state:%d,event:%d, has been exist\n", state, event)
} else {
m[state][event] = handler
}
}
func (c *Core) Start() {
go c.Handler()
c.tp.Start()
}
func (c *Core) Handler() {
func (c *Core) handlerListen() {
//阻塞读取消息
for p := range c.tp.ReadPacketChan() {
if len(p.Data) < 5 {
continue
}
if c.LogVerbose {
Println("Received: \n", string(p.Data))
}
if err := c.HandleReceiveMessage(p); err != nil {
fmt.Println("handler sip response message failed:", err.Error())
continue
@@ -195,71 +79,15 @@ func (c *Core) Handler() {
}
}
//发送消息:发送请求或者响应
//发送消息仅负责发送。报错有两种1、发送错误。2、发送了但是超时没有收到响应
//如果发送成功,如何判断是否收到响应?没有收到响应要重传
//所以一个transaction 有read和wriet的chan。
//发送的时候写 write chan
//接收的时候读取 read chan
//发送之后就开启timer超时重传还要记录和修改每次超时时间。不超时的话记得删掉timer
//发送 register 消息
func (c *Core) SendMessage(msg *sip.Message) *Response {
method := msg.GetMethod()
// data, _ := sip.Encode(msg)
// fmt.Println("send message:", method)
evt := getOutGoingMessageEvent(msg)
tid := getMessageTransactionID(msg)
//匹配事物
c.mutex.RLock()
ta, ok := c.transactions[tid]
c.mutex.RUnlock()
if !ok {
//新的请求
ta = c.initTransaction(c.ctx, tid, msg)
//如果是sip 消息事件则将消息缓存填充typo和state
if msg.IsRequest() {
//as uac
if method == sip.INVITE || method == sip.ACK {
ta.typo = FSM_ICT
ta.state = ICT_PRE_CALLING
} else {
ta.typo = FSM_NICT
ta.state = NICT_PRE_TRYING
}
} else {
//as uas:send response
}
c.AddTransaction(ta)
}
//把event推到transaction
ta.Run(evt, msg)
<-ta.Done()
if ta.lastResponse != nil {
return &Response{
Code: ta.lastResponse.GetStatusCode(),
Data: ta.lastResponse,
Message: ta.lastResponse.GetReason(),
}
} else {
return &Response{
Code: 504,
}
}
}
//接收到的消息处理
//收到消息有两种1、请求消息 2、响应消息
//请求消息则直接响应处理。
//响应消息则需要匹配到请求让请求的transaction来处理。
//TODO参考srs和osip的流程以及文档做最终处理。需要将逻辑分成两层TU 层和 transaction 层
func (c *Core) HandleReceiveMessage(p *transport.Packet) (err error) {
// fmt.Println("packet content:", string(p.Data))
var msg *sip.Message
msg, err = sip.Decode(p.Data)
//Println("packet content:", string(p.Data))
//var msg *Message
msg, err := Decode(p.Data)
if err != nil {
fmt.Println("parse sip message failed:", err.Error())
return ErrorParse
@@ -273,68 +101,81 @@ func (c *Core) HandleReceiveMessage(p *transport.Packet) (err error) {
return err
}
//fmt.Println("receive message:", msg.GetMethod())
evt := getInComingMessageEvent(msg)
tid := getMessageTransactionID(msg)
//一般应该是uas对于接收到的request做预处理
if msg.IsRequest() {
fixReceiveMessageViaParams(msg, p.Addr)
req := &Request{Message: msg}
req.SourceAdd = p.Addr
req.DestAdd = c.udpaddr
c.handlerRequest(req)
} else {
//TODO:对于uac收到response消息是否要检查 rport 和 received 呢因为uas可能对此做了修改
resp := &Response{Message: msg}
resp.SourceAdd = p.Addr
resp.DestAdd = c.udpaddr
c.handlerResponse(resp)
}
//TODOCANCEL、BYE 和 ACK 需要特殊处理使用事物或者直接由TU层处理
//查找transaction
c.mutex.RLock()
ta, ok := c.transactions[tid]
c.mutex.RUnlock()
method := msg.GetMethod()
if msg.IsRequest() {
switch method {
case sip.ACK:
//TODO:this should be a ACK for 2xx (but could be a late ACK!)
return
case sip.BYE:
c.Send(msg.BuildOK())
return
case sip.MESSAGE:
if c.OnMessage(msg) && ta == nil {
c.Send(msg.BuildOK())
}
if ta != nil {
m := msg.BuildOK()
ta.Run(getOutGoingMessageEvent(m), m)
}
case sip.REGISTER:
if !ok {
ta = c.initTransaction(c.ctx, tid, msg)
ta.typo = FSM_NIST
ta.state = NIST_PROCEEDING
c.AddTransaction(ta)
}
c.OnRegister(msg)
m := msg.BuildOK()
m.Contact = msg.Contact
m.Expires = msg.Expires
ta.Run(getOutGoingMessageEvent(m), m)
//case sip.INVITE:
// ta.typo = FSM_IST
// ta.state = IST_PRE_PROCEEDING
case sip.CANCEL:
//TODO:CANCEL处理
/* special handling for CANCEL */
/* in the new spec, if the CANCEL has a Via branch, then it
is the same as the one in the original INVITE */
return
}
} else if ok {
ta.Run(evt, msg)
}
//TODOTU层处理根据需要创建或者匹配 Dialog
//通过tag匹配到call和dialog
//处理是否要重传ack
return
}
func (c *Core) Send(msg *sip.Message) error {
func (c *Core) NewTX(key string) *GBTx {
tx := c.txs.NewTX(key, *c.tp.Conn())
tx.Core = c
return tx
}
func (c *Core) GetTX(key string) *GBTx {
return c.txs.GetTX(key)
}
func (c *Core) MustTX(key string) *GBTx {
tx := c.txs.GetTX(key)
if tx == nil {
tx = c.NewTX(key)
}
return tx
}
func (c *Core) handlerRequest(msg *Request) {
tx := c.MustTX(GetTXKey(msg.Message))
//Println("receive request from:", msg.Source(), ",method:", msg.GetMethod(), "txKey:", tx.Key(), "message: \n", string(encode))
c.hmu.RLock()
handler, ok := c.requestHandlers[msg.GetMethod()]
c.hmu.RUnlock()
if !ok {
encode, _ := Encode(msg.Message)
Println("not found handler func,requestMethod:", msg.GetMethod(), msg.Event, encode)
go handlerMethodNotAllowed(msg, tx)
return
}
go handler(msg, tx)
}
func (c *Core) handlerResponse(msg *Response) {
tx := c.GetTX(GetTXKey(msg.Message))
if tx == nil {
str, _ := Encode(msg.Message)
Println("not found tx. receive response from:", msg.Source(), "message: \n", string(str))
} else {
tx.ReceiveResponse(msg)
}
}
func handlerMethodNotAllowed(req *Request, tx *GBTx) {
var resp Response
resp.Message = req.BuildResponse(http.StatusMethodNotAllowed)
resp.DestAdd = req.SourceAdd
resp.SourceAdd = req.DestAdd
_ = tx.Respond(&resp)
}
func (c *Core) SipRequestForResponse(req *Request) (response *Response, err error) {
var tx *GBTx
tx, err = c.Request(req)
if err == nil {
return tx.SipResponse()
}
return
}
func (c *Core) ResolveAddress(msg *Message) (destAddr net.Addr, err error) {
addr := msg.Addr
if addr == "" {
@@ -361,22 +202,49 @@ func (c *Core) Send(msg *sip.Message) error {
// fmt.Println("dest addr:", addr)
var err1, err2 error
pkt := &transport.Packet{}
pkt.Data, err1 = sip.Encode(msg)
if msg.Via.Transport == "UDP" {
pkt.Addr, err2 = net.ResolveUDPAddr("udp", addr)
destAddr, err2 = net.ResolveUDPAddr("udp", addr)
} else {
pkt.Addr, err2 = net.ResolveTCPAddr("tcp", addr)
destAddr, err2 = net.ResolveTCPAddr("tcp", addr)
}
if err1 != nil {
return err1
return nil, err1
}
if err2 != nil {
return err2
return nil, err2
}
c.tp.WritePacket(pkt)
return nil
return destAddr, nil
}
// Request Request
func (c *Core) Request(req *Request) (*GBTx, error) {
if req.Via == nil {
var viaHop Via
viaHop.Host = c.SipIP
viaHop.Port = strconv.Itoa(int(c.SipPort))
viaHop.Params = make(map[string]string)
viaHop.Params["branch"] = RandBranch()
viaHop.Params["rport"] = ""
req.Via = &viaHop
}
tx := c.MustTX(GetTXKey(req.Message))
return tx, tx.Request(req)
}
// Request Request
func (c *Core) Respond(resp *Response) (*GBTx, error) {
tx := c.MustTX(GetTXKey(resp.Message))
return tx, tx.Respond(resp)
}
// RegistHandler RegistHandler
func (c *Core) RegistHandler(method Method, handler RequestHandler) {
c.hmu.Lock()
c.requestHandlers[method] = handler
c.hmu.Unlock()
}

View File

@@ -1,155 +0,0 @@
package transaction
import (
"github.com/Monibuca/plugin-gb28181/v3/sip"
)
/*
|INVITE from TU
Timer A fires |INVITE sent
Reset A, V Timer B fires
INVITE sent +-----------+ or Transport Err.
+---------| |---------------+inform TU
| | Calling | |
+-------->| |-------------->|
+-----------+ 2xx |
| | 2xx to TU |
| |1xx |
300-699 +---------------+ |1xx to TU |
ACK sent | | |
resp. to TU | 1xx V |
| 1xx to TU -----------+ |
| +---------| | |
| | |Proceeding |-------------->|
| +-------->| | 2xx |
| +-----------+ 2xx to TU |
| 300-699 | |
| ACK sent, | |
| resp. to TU| |
| | | NOTE:
| 300-699 V |
| ACK sent +-----------+Transport Err. | transitions
| +---------| |Inform TU | labeled with
| | | Completed |-------------->| the event
| +-------->| | | over the action
| +-----------+ | to take
| ^ | |
| | | Timer D fires |
+--------------+ | - |
| |
V |
+-----------+ |
| | |
| Terminated|<--------------+
| |
+-----------+
Figure 5: INVITE client transaction
*/
func ict_snd_invite(t *Transaction, evt Event, m *sip.Message) error {
t.isReliable = m.IsReliable()
t.origRequest = m
t.state = ICT_CALLING
//发送出去之后,开启 timer
if m.IsReliable() {
//stop timer E in reliable transport
//fmt.Println("Reliabel")
} else {
//fmt.Println("Not Reliable")
//发送定时器,每次加倍,没有上限?
t.timerA = NewSipTimer(T1, 0, func() {
if t.Err() == nil {
t.Run(TIMEOUT_A, nil)
}
})
}
t.RunAfter(TimeB, TIMEOUT_B)
return nil
}
func osip_ict_timeout_a_event(t *Transaction, evt Event, m *sip.Message) error {
err := t.SipSend(t.origRequest)
if err != nil {
//发送失败
t.Terminate()
return err
}
t.timerA.Reset(t.timerA.timeout * 2)
return nil
}
func osip_ict_timeout_b_event(t *Transaction, evt Event, m *sip.Message) error {
t.Terminate()
return nil
}
func ict_rcv_1xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
t.state = ICT_PROCEEDING
return nil
}
func ict_rcv_2xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
t.Terminate()
return nil
}
func ict_rcv_3456xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
if t.state != ICT_COMPLETED {
/* not a retransmission */
/* automatic handling of ack! */
ack := ict_create_ack(t, m)
t.ack = ack
_ = t.SipSend(t.ack)
t.Terminate()
}
t.RunAfter(TimeD, TIMEOUT_D)
t.state = ICT_COMPLETED
return nil
}
func ict_create_ack(t *Transaction, resp *sip.Message) *sip.Message {
return &sip.Message{
Mode: t.origRequest.Mode,
Addr: t.origRequest.Addr,
StartLine: &sip.StartLine{
Method: sip.ACK,
Uri: t.origRequest.StartLine.Uri,
},
MaxForwards: t.origRequest.MaxForwards,
CallID: t.callID,
Contact: t.origRequest.Contact,
UserAgent: t.origRequest.UserAgent,
Via: t.via,
From: t.from,
To: t.to,
CSeq: &sip.CSeq{
ID: 1,
Method: sip.ACK,
},
}
}
func ict_retransmit_ack(t *Transaction, evt Event, m *sip.Message) error {
if t.ack == nil {
/* ??? we should make a new ACK and send it!!! */
return nil
}
err := t.SipSend(t.ack)
if err != nil {
return err
}
t.state = ICT_COMPLETED
return nil
}
func osip_ict_timeout_d_event(t *Transaction, evt Event, m *sip.Message) error {
t.Terminate()
return nil
}

View File

@@ -1,28 +0,0 @@
package transaction
import "github.com/Monibuca/plugin-gb28181/v3/sip"
func ist_rcv_invite(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func osip_ist_timeout_g_event(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func osip_ist_timeout_h_event(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func ist_snd_1xx(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func ist_snd_2xx(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func ist_snd_3456xx(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func ist_rcv_ack(t *Transaction, evt Event,m *sip.Message) error {
return nil
}
func osip_ist_timeout_i_event(t *Transaction, evt Event,m *sip.Message) error {
return nil
}

View File

@@ -1,134 +0,0 @@
package transaction
import (
"github.com/Monibuca/plugin-gb28181/v3/sip"
)
/*
非invite事物的状态机
|Request from TU
|send request
Timer E V
send request +-----------+
+---------| |-------------------+
| | Trying | Timer F |
+-------->| | or Transport Err.|
+-----------+ inform TU |
200-699 | | |
resp. to TU | |1xx |
+---------------+ |resp. to TU |
| | |
| Timer E V Timer F |
| send req +-----------+ or Transport Err. |
| +---------| | inform TU |
| | |Proceeding |------------------>|
| +-------->| |-----+ |
| +-----------+ |1xx |
| | ^ |resp to TU |
| 200-699 | +--------+ |
| resp. to TU | |
| | |
| V |
| +-----------+ |
| | | |
| | Completed | |
| | | |
| +-----------+ |
| ^ | |
| | | Timer K |
+--------------+ | - |
| |
V |
NOTE: +-----------+ |
| | |
transitions | Terminated|<------------------+
labeled with | |
the event +-----------+
over the action
to take
Figure 6: non-INVITE client transaction
*/
func nict_snd_request(t *Transaction, evt Event, m *sip.Message) error {
//fmt.Println("nict request:", msg.GetMethod())
t.origRequest = m
t.state = NICT_TRYING
err := t.SipSend(m)
if err != nil {
t.Terminate()
return err
}
//发送出去之后,开启 timer
if m.IsReliable() {
//stop timer E in reliable transport
//fmt.Println("Reliabel")
} else {
//fmt.Println("Not Reliable")
//发送定时器
t.timerE = NewSipTimer(T1, T2, func() {
if t.Err() == nil {
t.Run(TIMEOUT_E, nil)
}
})
}
t.RunAfter(TimeF, TIMEOUT_F)
return nil
}
//事物超时
func osip_nict_timeout_f_event(t *Transaction, evt Event, m *sip.Message) error {
t.Terminate()
return nil
}
func osip_nict_timeout_e_event(t *Transaction, evt Event, m *sip.Message) error {
if t.state == NICT_TRYING {
//reset timer
t.timerE.Reset(t.timerE.timeout * 2)
} else {
//in PROCEEDING STATE, TIMER is always T2
t.timerE.Reset(T2)
}
//resend origin request
err := t.SipSend(t.origRequest)
if err != nil {
t.Terminate()
return err
}
return nil
}
func nict_rcv_1xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
t.state = NICT_PROCEEDING
//重置发送定时器
t.timerE.Reset(T2)
return nil
}
func nict_rcv_23456xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
t.state = NICT_COMPLETED
t.Terminate()
// if m.IsReliable() {
// //不设置timerK
// } else {
// t.RunAfter(T4*64, TIMEOUT_K)
// }
return nil
}
func osip_nict_timeout_k_event(t *Transaction, evt Event, m *sip.Message) error {
t.Terminate()
return nil
}

View File

@@ -1,105 +0,0 @@
package transaction
import (
"fmt"
"github.com/Monibuca/plugin-gb28181/v3/sip"
)
/*
|Request received
|pass to TU
V
+-----------+
| |
| Trying |-------------+
| | |
+-----------+ |200-699 from TU
| |send response
|1xx from TU |
|send response |
| |
Request V 1xx from TU |
send response+-----------+send response|
+--------| |--------+ |
| | Proceeding| | |
+------->| |<-------+ |
+<--------------| | |
|Trnsprt Err +-----------+ |
|Inform TU | |
| | |
| |200-699 from TU |
| |send response |
| Request V |
| send response+-----------+ |
| +--------| | |
| | | Completed |<------------+
| +------->| |
+<--------------| |
|Trnsprt Err +-----------+
|Inform TU |
| |Timer J fires
| |-
| |
| V
| +-----------+
| | |
+-------------->| Terminated|
| |
+-----------+
Figure 8: non-INVITE server transaction
*/
func nist_rcv_request(t *Transaction, evt Event, m *sip.Message) error {
fmt.Println("rcv request: ", m.GetMethod())
fmt.Println("transaction state: ", t.state.String())
if t.state != NIST_PRE_TRYING {
fmt.Println("rcv request retransmission,do response")
if t.lastResponse != nil {
err := t.SipSend(t.lastResponse)
if err != nil {
//transport error
return err
}
}
return nil
} else {
t.origRequest = m
t.state = NIST_TRYING
t.isReliable = m.IsReliable()
}
return nil
}
func nist_snd_1xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
err := t.SipSend(t.lastResponse)
if err != nil {
return err
}
t.state = NIST_PROCEEDING
return nil
}
func nist_snd_23456xx(t *Transaction, evt Event, m *sip.Message) error {
t.lastResponse = m
if err := t.SipSend(t.lastResponse); err != nil {
return err
}
if t.state != NIST_COMPLETED {
if !t.isReliable {
t.RunAfter(T1*64, TIMEOUT_J)
}
}
t.state = NIST_COMPLETED
return nil
}
func osip_nist_timeout_j_event(t *Transaction, evt Event, m *sip.Message) error {
t.Terminate()
return nil
}

View File

@@ -1,430 +0,0 @@
package transaction
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transport"
)
//状态机之状态
type State int
const (
/* STATES for invite client transaction */
ICT_PRE_CALLING State = iota
ICT_CALLING
ICT_PROCEEDING
ICT_COMPLETED
ICT_TERMINATED
/* STATES for invite server transaction */
IST_PRE_PROCEEDING
IST_PROCEEDING
IST_COMPLETED
IST_CONFIRMED
IST_TERMINATED
/* STATES for NON-invite client transaction */
NICT_PRE_TRYING
NICT_TRYING
NICT_PROCEEDING
NICT_COMPLETED
NICT_TERMINATED
/* STATES for NON-invite server transaction */
NIST_PRE_TRYING
NIST_TRYING
NIST_PROCEEDING
NIST_COMPLETED
NIST_TERMINATED
/* STATES for dialog */
DIALOG_EARLY
DIALOG_CONFIRMED
DIALOG_CLOSE
)
var stateMap = map[State]string{
ICT_PRE_CALLING: "ICT_PRE_CALLING",
ICT_CALLING: "ICT_CALLING",
ICT_PROCEEDING: "ICT_PROCEEDING",
ICT_COMPLETED: "ICT_COMPLETED",
ICT_TERMINATED: "ICT_TERMINATED",
IST_PRE_PROCEEDING: "IST_PRE_PROCEEDING",
IST_PROCEEDING: "IST_PROCEEDING",
IST_COMPLETED: "IST_COMPLETED",
IST_CONFIRMED: "IST_CONFIRMED",
IST_TERMINATED: "IST_TERMINATED",
NICT_PRE_TRYING: "NICT_PRE_TRYING",
NICT_TRYING: "NICT_TRYING",
NICT_PROCEEDING: "NICT_PROCEEDING",
NICT_COMPLETED: "NICT_COMPLETED",
NICT_TERMINATED: "NICT_TERMINATED",
NIST_PRE_TRYING: "NIST_PRE_TRYING",
NIST_TRYING: "NIST_TRYING",
NIST_PROCEEDING: "NIST_PROCEEDING",
NIST_COMPLETED: "NIST_COMPLETED",
NIST_TERMINATED: "NIST_TERMINATED",
DIALOG_EARLY: "DIALOG_EARLY",
DIALOG_CONFIRMED: "DIALOG_CONFIRMED",
DIALOG_CLOSE: "DIALOG_CLOSE",
}
func (s State) String() string {
return stateMap[s]
}
//状态机之事件
type Event int
const (
/* TIMEOUT EVENTS for ICT */
TIMEOUT_A Event = iota /**< Timer A */
TIMEOUT_B /**< Timer B */
TIMEOUT_D /**< Timer D */
/* TIMEOUT EVENTS for NICT */
TIMEOUT_E /**< Timer E */
TIMEOUT_F /**< Timer F */
TIMEOUT_K /**< Timer K */
/* TIMEOUT EVENTS for IST */
TIMEOUT_G /**< Timer G */
TIMEOUT_H /**< Timer H */
TIMEOUT_I /**< Timer I */
/* TIMEOUT EVENTS for NIST */
TIMEOUT_J /**< Timer J */
/* FOR INCOMING MESSAGE */
RCV_REQINVITE /**< Event is an incoming INVITE request */
RCV_REQACK /**< Event is an incoming ACK request */
RCV_REQUEST /**< Event is an incoming NON-INVITE and NON-ACK request */
RCV_STATUS_1XX /**< Event is an incoming informational response */
RCV_STATUS_2XX /**< Event is an incoming 2XX response */
RCV_STATUS_3456XX /**< Event is an incoming final response (not 2XX) */
/* FOR OUTGOING MESSAGE */
SND_REQINVITE /**< Event is an outgoing INVITE request */
SND_REQACK /**< Event is an outgoing ACK request */
SND_REQUEST /**< Event is an outgoing NON-INVITE and NON-ACK request */
SND_STATUS_1XX /**< Event is an outgoing informational response */
SND_STATUS_2XX /**< Event is an outgoing 2XX response */
SND_STATUS_3456XX /**< Event is an outgoing final response (not 2XX) */
KILL_TRANSACTION /**< Event to 'kill' the transaction before termination */
UNKNOWN_EVT /**< Max event */
)
var eventMap = map[Event]string{
TIMEOUT_A: "TIMEOUT_A",
TIMEOUT_B: "TIMEOUT_B",
TIMEOUT_D: "TIMEOUT_D",
TIMEOUT_E: "TIMEOUT_E",
TIMEOUT_F: "TIMEOUT_F",
TIMEOUT_K: "TIMEOUT_K",
TIMEOUT_G: "TIMEOUT_G",
TIMEOUT_H: "TIMEOUT_H",
TIMEOUT_I: "TIMEOUT_I",
TIMEOUT_J: "TIMEOUT_J",
RCV_REQINVITE: "RCV_REQINVITE",
RCV_REQACK: "RCV_REQACK",
RCV_REQUEST: "RCV_REQUEST",
RCV_STATUS_1XX: "RCV_STATUS_1XX",
RCV_STATUS_2XX: "RCV_STATUS_2XX",
RCV_STATUS_3456XX: "RCV_STATUS_3456XX",
SND_REQINVITE: "SND_REQINVITE",
SND_REQACK: "SND_REQACK",
SND_REQUEST: "SND_REQUEST",
SND_STATUS_1XX: "SND_STATUS_1XX",
SND_STATUS_2XX: "SND_STATUS_2XX",
SND_STATUS_3456XX: "SND_STATUS_3456XX",
KILL_TRANSACTION: "KILL_TRANSACTION",
UNKNOWN_EVT: "UNKNOWN_EVT",
}
func (e Event) String() string {
return eventMap[e]
}
//incoming SIP MESSAGE
func (e Event) IsIncomingMessage() bool {
return e >= RCV_REQINVITE && e <= RCV_STATUS_3456XX
}
//incoming SIP REQUEST
func (e Event) IsIncomingRequest() bool {
return e == RCV_REQINVITE || e == RCV_REQACK || e == RCV_REQUEST
}
//incoming SIP RESPONSE
func (e Event) IsIncomingResponse() bool {
return e == RCV_STATUS_1XX || e == RCV_STATUS_2XX || e == RCV_STATUS_3456XX
}
//outgoing SIP MESSAGE
func (e Event) IsOutgoingMessage() bool {
return e >= SND_REQINVITE && e <= SND_REQINVITE
}
//outgoing SIP REQUEST
func (e Event) IsOutgoingRequest() bool {
return e == SND_REQINVITE || e == SND_REQACK || e == SND_REQUEST
}
//outgoing SIP RESPONSE
func (e Event) IsOutgoingResponse() bool {
return e == SND_STATUS_1XX || e == SND_STATUS_2XX || e == SND_STATUS_3456XX
}
//a SIP MESSAGE
func (e Event) IsSipMessage() bool {
return e >= RCV_REQINVITE && e <= SND_STATUS_3456XX
}
type EventObj struct {
evt Event // event type
tid string // transaction id
msg *sip.Message
}
//状态机类型
type FSMType int
const (
FSM_ICT FSMType = iota /**< Invite Client (outgoing) Transaction */
FSM_IST /**< Invite Server (incoming) Transaction */
FSM_NICT /**< Non-Invite Client (outgoing) Transaction */
FSM_NIST /**< Non-Invite Server (incoming) Transaction */
FSM_UNKNOWN /**< Invalid Transaction */
)
var typeMap = map[FSMType]string{
FSM_ICT: "FSM_ICT",
FSM_IST: "FSM_IST",
FSM_NICT: "FSM_NICT",
FSM_NIST: "FSM_NIST",
FSM_UNKNOWN: "FSM_UNKNOWN",
}
func (t FSMType) String() string {
return typeMap[t]
}
//对外将sip通讯封装成请求和响应
//TODO可参考http的request和response屏蔽sip协议细节
type Request struct {
data *sip.Message
}
//Code = 0则响应正常
//Code != 0打印错误提示信息 Message
type Response struct {
Code int
Message string
Data *sip.Message
}
type Handler func(*Transaction, Event, *sip.Message) error //操作
type Header map[string]string
// timer相关基础常量、方法等定义
const (
T1 = 100 * time.Millisecond
T2 = 4 * time.Second
T4 = 5 * time.Second
TimeA = T1
TimeB = 64 * T1
TimeD = 32 * time.Second
TimeE = T1
TimeF = 64 * T1
TimeG = T1
TimeH = 64 * T1
TimeI = T4
TimeJ = 64 * T1
TimeK = T4
Time1xx = 100 * time.Millisecond
)
//TODO是否要管理当前 transaction 的多次请求和响应的message
//TODO是否要管理当前 transaction 的头域
//TODO多种transaction在一个struct里面管理不太方便暂时写在一起后期重构分开并使用interface 解耦
//是否需要tp layer
type Transaction struct {
cancel context.CancelFunc
context.Context //线程管理、其他参数
sync.Mutex
id string //transaction ID
isReliable bool //是否可靠传输
core *Core //全局参数
typo FSMType //状态机类型
state State //当前状态
response chan *Response //输出的响应
startAt time.Time //开始时间
endAt time.Time //结束时间
//messages []*sip.Message //传输的消息缓存origin request/last response/request ack...
//header Header //创建事物的消息头域参数:Via From To CallID CSeq
via *sip.Via
from *sip.Contact
to *sip.Contact
callID string
cseq *sip.CSeq
origRequest *sip.Message //Initial request
lastResponse *sip.Message //Last response可能是临时的也可能是最终的
ack *sip.Message //ack request sent
//timer for ict
timerA *SipTimer
//timer for nict
timerE *SipTimer
}
type SipTimer struct {
tm *time.Timer
timeout time.Duration //当前超时时间
max time.Duration //最大超时时间
}
func NewSipTimer(d, max time.Duration, f func()) *SipTimer {
return &SipTimer{
tm: time.AfterFunc(d, f),
timeout: d,
max: max,
}
}
func (t *SipTimer) Reset(d time.Duration) {
t.timeout = d
if t.timeout > t.max && t.max != 0 {
t.timeout = t.max
}
t.tm.Reset(t.timeout)
}
func (ta *Transaction) SetState(s State) {
ta.state = s
}
func (ta *Transaction) GetTid() string {
return ta.id
}
func (ta *Transaction) RunAfter(t time.Duration, evt Event) {
time.AfterFunc(t, func() {
if ta.Err() == nil {
ta.Run(evt, nil)
}
})
}
//每一个transaction至少有一个状态机线程运行
//TODO:如果是一个uac的transaction则把最后响应的消息返回通过response chan
//transaction有很多消息需要传递到TU也接收来自TU的消息。
func (ta *Transaction) Run(evt Event, m *sip.Message) {
//根据event调用对应的handler
//fmt.Println("fsm run event:", e.evt.String())
core := ta.core
ta.Lock()
defer ta.Unlock()
evtHandlers, ok1 := core.handlers[ta.state]
if !ok1 {
//fmt.Println("invalid state:", ta.state.String())
return
}
f, ok2 := evtHandlers[evt]
if !ok2 {
//fmt.Println("invalid handler for this event:", e.evt.String())
return
}
//fmt.Printf("state:%s, event:%s\n", state.String(), e.evt.String())
err := f(ta, evt, m)
if err != nil {
//fmt.Printf("transaction run failed, state:%s, event:%s\n", state.String(), e.evt.String())
}
}
//Terminated:事物的终止
//TODOcheck调用时机
func (ta *Transaction) Terminate() {
ta.state = NICT_TERMINATED
switch ta.typo {
case FSM_ICT:
ta.state = ICT_TERMINATED
case FSM_NICT:
ta.state = NICT_TERMINATED
case FSM_IST:
ta.state = IST_TERMINATED
case FSM_NIST:
ta.state = NIST_TERMINATED
}
//关掉事物
ta.cancel()
//TODO某些timer需要检查并关掉并且设置为nil
ta.core.DelTransaction(ta.id)
}
//根据sip消息解析出目标服务器地址发送消息
func (ta *Transaction) SipSend(msg *sip.Message) error {
err := checkMessage(msg)
if err != nil {
return err
}
addr := msg.Addr
if addr == "" {
viaParams := msg.Via.Params
//host
var host, port string
var ok1, ok2 bool
if host, ok1 = viaParams["maddr"]; !ok1 {
if host, ok2 = viaParams["received"]; !ok2 {
host = msg.Via.Host
}
}
//port
port = viaParams["rport"]
if port == "" || port == "0" || port == "-1" {
port = msg.Via.Port
}
if port == "" {
port = "5060"
}
addr = fmt.Sprintf("%s:%s", host, port)
}
//fmt.Println("dest addr:", addr)
var err1, err2 error
pkt := &transport.Packet{}
pkt.Data, err1 = sip.Encode(msg)
if msg.Via.Transport == "UDP" {
pkt.Addr, err2 = net.ResolveUDPAddr("udp", addr)
} else {
pkt.Addr, err2 = net.ResolveTCPAddr("tcp", addr)
}
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
ta.core.tp.WritePacket(pkt)
return nil
}

View File

@@ -0,0 +1,27 @@
package transaction
import (
"context"
"fmt"
"testing"
"time"
)
func TestTimeout(t *testing.T) {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go func() {
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
default:
time.Sleep(1000 * time.Millisecond)
fmt.Println("fuck gc.")
}
}()
}

157
transaction/tx.go Normal file
View File

@@ -0,0 +1,157 @@
package transaction
import (
"github.com/Monibuca/plugin-gb28181/v3/sip"
. "github.com/Monibuca/plugin-gb28181/v3/transport"
"github.com/Monibuca/plugin-gb28181/v3/utils"
"net/http"
"sync"
"time"
)
var ActiveTX *GBTxs
// GBTxs a GBTxs stands for a Gb28181 Transaction collection
type GBTxs struct {
Txs map[string]*GBTx
RWM *sync.RWMutex
}
func (txs *GBTxs) NewTX(key string, conn Connection) *GBTx {
tx := NewTransaction(key, conn)
txs.RWM.Lock()
txs.Txs[key] = tx
txs.RWM.Unlock()
return tx
}
func (txs *GBTxs) GetTX(key string) *GBTx {
txs.RWM.RLock()
tx, ok := txs.Txs[key]
if !ok {
tx = nil
}
txs.RWM.RUnlock()
return tx
}
func (txs *GBTxs) rmTX(tx *GBTx) {
txs.RWM.Lock()
delete(txs.Txs, tx.key)
txs.RWM.Unlock()
}
// GBTx Gb28181 Transaction
type GBTx struct {
conn Connection
key string
resp chan *sip.Response
active chan int
*Core
}
// NewTransaction create a new GBtx
func NewTransaction(key string, conn Connection) *GBTx {
tx := &GBTx{conn: conn, key: key, resp: make(chan *sip.Response, 10), active: make(chan int, 1)}
go tx.watch()
return tx
}
// Key returns the GBTx Key
func (tx *GBTx) Key() string {
return tx.key
}
func (tx *GBTx) watch() {
for {
select {
case <-tx.active:
//Println("active tx", tx.Key(), time.Now().Format("2006-01-02 15:04:05"))
case <-time.After(20 * time.Second):
tx.Close()
//Println("watch closed tx", tx.key, time.Now().Format("2006-01-02 15:04:05"))
return
}
}
}
// GetResponse GetResponse
func (tx *GBTx) GetResponse() *sip.Response {
for {
res := <-tx.resp
if res == nil {
return res
}
tx.active <- 2
//Println("response tx", tx.key, time.Now().Format("2006-01-02 15:04:05"))
if res.GetStatusCode() == http.StatusContinue || res.GetStatusCode() == http.StatusSwitchingProtocols {
// Trying and Dialog Establishement 等待下一个返回
continue
}
return res
}
}
// Close the Close function closes the GBTx
func (tx *GBTx) Close() {
//Printf("closed tx: %s %s TXs: %d", tx.key, time.Now().Format("2006-01-02 15:04:05"), len(ActiveTX.Txs))
ActiveTX.rmTX(tx)
close(tx.resp)
close(tx.active)
}
// ReceiveResponse receive a Response
func (tx *GBTx) ReceiveResponse(msg *sip.Response) {
defer func() {
if r := recover(); r != nil {
//Println("send to closed channel, txkey:", tx.key, "message: \n", msg)
}
}()
//Println("receiveResponse tx", tx.Key(), time.Now().Format("2006-01-02 15:04:05"))
tx.resp <- msg
tx.active <- 1
}
// Respond Respond
func (tx *GBTx) Respond(res *sip.Response) error {
str, _ := sip.Encode(res.Message)
//Println("send response,to:", (res.DestAdd).String(), "txkey:", tx.key, "message: \n", string(str))
_, err := tx.conn.WriteTo(str, res.DestAdd)
return err
}
// Request Request
func (tx *GBTx) Request(req *sip.Request) error {
str, _ := sip.Encode(req.Message)
//Println("send Request,to:", (req.DestAdd).String(), "txkey:", tx.key, "message: \n", string(str))
_, err := tx.conn.WriteTo(str, req.DestAdd)
return err
}
func GetTXKey(msg *sip.Message) (key string) {
if len(msg.CallID) > 0 {
key = msg.CallID
} else {
key = utils.RandString(10)
}
return
}
func (tx *GBTx) SipResponse() (*sip.Response, error) {
response := tx.GetResponse()
if response == nil {
return nil, utils.NewError(nil, "response timeout", "tx key:", tx.Key())
}
if response.GetStatusCode() != http.StatusOK {
return response, utils.NewError(nil, "response fail", response.GetStatusCode(), response.GetReason(), "tx key:", tx.Key())
}
return response, nil
}
func (tx *GBTx) SipRequestForResponse(req *sip.Request) (response *sip.Response, err error) {
err = tx.Request(req)
if err == nil {
return tx.SipResponse()
}
return
}

View File

@@ -17,71 +17,11 @@ import (
//参考RFC3261
func getMessageTransactionID(m *sip.Message) string {
if m.GetMethod() == sip.ACK {
//TODO在匹配服务端事的ACK中创建事务的请求的方法为INVITE。所以ACK消息匹配事的时候需要注意????
//TODO在匹配服务端事的ACK中创建事务的请求的方法为INVITE。所以ACK消息匹配事的时候需要注意????
}
return string(m.GetMethod()) + "_" + m.GetBranch()
}
//根据收到的响应的消息的状态码,获取事件
func getInComingMessageEvent(m *sip.Message) Event {
//request根据请求方法来确认事件
if m.IsRequest() {
method := m.GetMethod()
if method == sip.INVITE {
return RCV_REQINVITE
} else if method == sip.ACK {
return RCV_REQACK
} else {
return RCV_REQUEST
}
}
//response根据状态码来确认事件
status := m.StartLine.Code
if status >= 100 && status < 200 {
return RCV_STATUS_1XX
}
if status >= 200 && status < 300 {
return RCV_STATUS_2XX
}
if status >= 300 {
return RCV_STATUS_3456XX
}
return UNKNOWN_EVT
}
//根据发出的响应的消息的状态码,获取事件
func getOutGoingMessageEvent(m *sip.Message) Event {
//request:get event by method
if m.IsRequest() {
method := m.GetMethod()
if method == sip.INVITE {
return SND_REQINVITE
} else if method == sip.ACK {
return SND_REQACK
} else {
return SND_REQUEST
}
}
//response:get event by status
status := m.StartLine.Code
if status >= 100 && status < 200 {
return SND_STATUS_1XX
}
if status >= 200 && status < 300 {
return SND_STATUS_2XX
}
if status >= 300 {
return SND_STATUS_3456XX
}
return UNKNOWN_EVT
}
func checkMessage(msg *sip.Message) error {
//TODO:sip消息解析成功之后检查必要元素如果失败则返回 ErrorCheckMessage

115
transport/connection.go Normal file
View File

@@ -0,0 +1,115 @@
package transport
import (
. "github.com/Monibuca/utils/v3"
"net"
"strings"
"sync"
"time"
)
// Connection Wrapper around net.Conn.
type Connection interface {
net.Conn
Network() string
// String() string
ReadFrom(buf []byte) (num int, raddr net.Addr, err error)
WriteTo(buf []byte, raddr net.Addr) (num int, err error)
}
// Connection implementation.
type connection struct {
baseConn net.Conn
laddr net.Addr
raddr net.Addr
mu sync.RWMutex
logKey string
Online bool
ReconnectCount int64 //重连次数
}
func newUDPConnection(baseConn net.Conn) Connection {
conn := &connection{
baseConn: baseConn,
laddr: baseConn.LocalAddr(),
raddr: baseConn.RemoteAddr(),
logKey: "udpConnection",
}
return conn
}
func newTCPConnection(baseConn net.Conn) Connection {
conn := &connection{
baseConn: baseConn,
laddr: baseConn.LocalAddr(),
raddr: baseConn.RemoteAddr(),
logKey: "udpConnection",
}
return conn
}
func (conn *connection) Read(buf []byte) (int, error) {
var (
num int
err error
)
num, err = conn.baseConn.Read(buf)
return num, err
}
func (conn *connection) ReadFrom(buf []byte) (num int, raddr net.Addr, err error) {
num, raddr, err = conn.baseConn.(net.PacketConn).ReadFrom(buf)
if err != nil {
return num, raddr, err
}
Printf("readFrom %d , %s -> %s \n %s", num, raddr, conn.LocalAddr(), string(buf[:num]))
return num, raddr, err
}
func (conn *connection) Write(buf []byte) (int, error) {
var (
num int
err error
)
num, err = conn.baseConn.Write(buf)
return num, err
}
func (conn *connection) WriteTo(buf []byte, raddr net.Addr) (num int, err error) {
num, err = conn.baseConn.(net.PacketConn).WriteTo(buf, raddr)
if err != nil {
return num, err
}
//Printf("writeTo %d , %s -> %s \n %s", num, conn.baseConn.LocalAddr(), raddr.String(), string(buf[:num]))
return num, err
}
func (conn *connection) LocalAddr() net.Addr {
return conn.baseConn.LocalAddr()
}
func (conn *connection) RemoteAddr() net.Addr {
return conn.baseConn.RemoteAddr()
}
func (conn *connection) Close() error {
err := conn.baseConn.Close()
return err
}
func (conn *connection) Network() string {
return strings.ToUpper(conn.baseConn.LocalAddr().Network())
}
func (conn *connection) SetDeadline(t time.Time) error {
return conn.baseConn.SetDeadline(t)
}
func (conn *connection) SetReadDeadline(t time.Time) error {
return conn.baseConn.SetReadDeadline(t)
}
func (conn *connection) SetWriteDeadline(t time.Time) error {
return conn.baseConn.SetWriteDeadline(t)
}

View File

@@ -16,7 +16,7 @@ func RunServerTCP() {
tcp := NewTCPServer(SipPort, true)
go PacketHandler(tcp)
go func() {
_ = tcp.Start()
_ = tcp.StartAndWait()
}()
select {}
@@ -27,7 +27,7 @@ func RunClientTCP() {
c := NewTCPClient(SipHost, SipPort)
go PacketHandler(c)
go func() {
_ = c.Start()
_ = c.StartAndWait()
}()
//发送测试数据
@@ -69,7 +69,7 @@ func RunServerUDP() {
go PacketHandler(udp)
go func() {
_ = udp.Start()
_ = udp.StartAndWait()
}()
select {}
@@ -79,7 +79,7 @@ func RunClientUDP() {
c := NewUDPClient(SipHost, SipPort)
go PacketHandler(c)
go func() {
_ = c.Start()
_ = c.StartAndWait()
}()
//发送测试数据
go func() {

View File

@@ -9,7 +9,7 @@ type TCPClient struct {
Statistic
host string
port uint16
conn net.Conn
conn Connection
readChan chan *Packet
writeChan chan *Packet
remoteAddr net.Addr
@@ -43,7 +43,7 @@ func (c *TCPClient) RemoteAddr() net.Addr {
return c.remoteAddr
}
func (c *TCPClient) Start() error {
func (c *TCPClient) StartAndWait() error {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", c.host, c.port))
if err != nil {
fmt.Println("dial tcp server failed :", err.Error())
@@ -52,7 +52,7 @@ func (c *TCPClient) Start() error {
fmt.Println("start tcp client")
}
c.conn = conn
c.conn = newTCPConnection(conn)
c.remoteAddr = conn.RemoteAddr()
c.localAddr = conn.LocalAddr()
@@ -113,3 +113,7 @@ func (c *TCPClient) Heartbeat(p *Packet) {
}
c.WritePacket(p)
}
func (s *TCPClient) Conn() *Connection {
return &s.conn
}

View File

@@ -42,7 +42,7 @@ func (s *TCPServer) IsKeepalive() bool {
return s.Keepalive
}
func (s *TCPServer) Start() error {
func (s *TCPServer) StartAndWait() error {
//监听端口
//开启tcp连接线程
var err error
@@ -69,8 +69,8 @@ func (s *TCPServer) Start() error {
if !ok {
return
}
c := val.(*Connection)
_, _ = c.Conn.Write(p.Data)
c := *val.(*Connection)
_, _ = c.Write(p.Data)
case <-s.done:
return
}
@@ -104,17 +104,18 @@ func (s *TCPServer) Start() error {
}
// conn.SetReadDeadline(time.Now().Add(600 * time.Second))
session := &Connection{Conn: conn, Addr: conn.RemoteAddr()}
address := session.Addr.String()
session := newTCPConnection(conn)
address := session.RemoteAddr().String()
s.sessions.Store(address, session)
fmt.Println(fmt.Sprintf("new tcp client remoteAddr: %v", address))
go s.handlerSession(session)
go s.handlerSession(&session)
}
}
func (s *TCPServer) handlerSession(c *Connection) {
addrStr := c.Addr.String()
conn := *c
addrStr := conn.RemoteAddr().String()
//recovery from panic
defer func() {
@@ -126,16 +127,16 @@ func (s *TCPServer) handlerSession(c *Connection) {
buf := make([]byte, 2048)
for {
n, err := c.Conn.Read(buf)
n, err := conn.Read(buf)
switch {
case err == nil:
p := &Packet{
Addr: c.Addr,
Addr: conn.RemoteAddr(),
Data: buf[:n],
}
s.readChan <- p
case err == io.EOF:
fmt.Println(fmt.Sprintf("io.EOF,client close --- remoteAddr: %v", c.Addr))
fmt.Println(fmt.Sprintf("io.EOF,client close --- remoteAddr: %v", conn.RemoteAddr()))
return
case err != nil:
fmt.Println("client other err: ", err)
@@ -150,8 +151,8 @@ func (s *TCPServer) CloseOne(addr string) {
if !ok {
return
}
c := val.(*Connection)
_ = c.Conn.Close()
c := *val.(*Connection)
_ = c.Close()
s.sessions.Delete(addr)
}
@@ -165,10 +166,14 @@ func (s *TCPServer) WritePacket(packet *Packet) {
func (s *TCPServer) Close() error {
//TODOTCP服务退出之前需要先close掉所有客户端的连接
s.sessions.Range(func(key, value interface{}) bool {
c := value.(*Connection)
_ = c.Conn.Close()
c := *value.(*Connection)
_ = c.Close()
s.sessions.Delete(key)
return true
})
return nil
}
func (s *TCPServer) Conn() *Connection {
return nil
}

View File

@@ -23,9 +23,10 @@ type ITransport interface {
Name() string
ReadPacketChan() <-chan *Packet //读消息,消息处理器需在循环中阻塞读取
WritePacket(packet *Packet) //写消息
Start() error //开启连接,阻塞接收消息
StartAndWait() error //开启连接,阻塞接收消息
Close() error //关闭连接
IsReliable() bool //是否可靠传输
Conn() *Connection
}
type IServer interface {
@@ -48,18 +49,6 @@ type Packet struct {
Data []byte
}
//对于面向连接的UDP或者TCP都可以面向连接维持心跳即可必须有session
type Connection struct {
Addr net.Addr
Conn net.Conn
Online bool
ReconnectCount int64 //重连次数
}
func (s *Connection) Close() {
//TODO处理session的关闭修改缓存状态、数据库状态、发送离线通知、执行离线回调等等
}
//通讯统计
type Statistic struct {
startTime time.Time

View File

@@ -10,7 +10,7 @@ type UDPClient struct {
Statistic
host string
port uint16
conn *net.UDPConn
conn Connection
readChan chan *Packet
writeChan chan *Packet
done chan struct{}
@@ -43,7 +43,7 @@ func (c *UDPClient) RemoteAddr() net.Addr {
return c.remoteAddr
}
func (c *UDPClient) Start() error {
func (c *UDPClient) StartAndWait() error {
addrStr := fmt.Sprintf("%s:%d", c.host, c.port)
addr, err := net.ResolveUDPAddr("udp", addrStr)
if err != nil {
@@ -117,3 +117,7 @@ func (c *UDPClient) Heartbeat(p *Packet) {
}
c.WritePacket(p)
}
func (c *UDPClient) Conn() *Connection {
return &c.conn
}

View File

@@ -9,9 +9,9 @@ import (
type UDPServer struct {
Statistic
addr string
Conn *net.UDPConn
ReadChan chan *Packet
WriteChan chan *Packet
conn *Connection
readChan chan *Packet
writeChan chan *Packet
done chan struct{}
Keepalive bool
//Sessions sync.Map // key is remote-addr的string , value:*Connection。UDP不需要
@@ -22,8 +22,8 @@ func NewUDPServer(port uint16) IServer {
return &UDPServer{
addr: addrStr,
ReadChan: make(chan *Packet, 10),
WriteChan: make(chan *Packet, 10),
readChan: make(chan *Packet, 1024),
writeChan: make(chan *Packet, 1024),
done: make(chan struct{}),
}
}
@@ -40,7 +40,7 @@ func (s *UDPServer) IsKeepalive() bool {
return s.Keepalive
}
func (s *UDPServer) Start() error {
func (s *UDPServer) StartAndWait() error {
addr, err := net.ResolveUDPAddr("udp", s.addr)
if err != nil {
fmt.Println("Can't resolve address: ", err)
@@ -55,26 +55,14 @@ func (s *UDPServer) Start() error {
defer func() {
_ = conn.Close()
}()
s.Conn = conn
udpConnection := newUDPConnection(conn)
s.conn = &udpConnection
fmt.Println("start udp server at: ", s.addr)
//心跳线程
if s.Keepalive {
//TODO:start heartbeat thread
}
//写线程
go func() {
for {
select {
case p := <-s.WriteChan:
_, _ = s.Conn.WriteTo(p.Data, p.Addr)
case <-s.done:
return
}
}
}()
//读线程
for {
@@ -84,17 +72,17 @@ func (s *UDPServer) Start() error {
fmt.Println("failed to read UDP msg because of ", err.Error())
continue
}
s.ReadChan <- &Packet{
s.readChan <- &Packet{
Addr: remoteAddr,
Data: data[:n],
}
}
}
func (s *UDPServer) ReadPacketChan() <-chan *Packet {
return s.ReadChan
return s.readChan
}
func (s *UDPServer) WritePacket(packet *Packet) {
s.WriteChan <- packet
s.writeChan <- packet
}
func (s *UDPServer) Close() error {
@@ -104,3 +92,7 @@ func (s *UDPServer) Close() error {
func (s *UDPServer) CloseOne(addr string) {
//处理某设备离线
}
func (s *UDPServer) Conn() *Connection {
return s.conn
}

View File

@@ -1,6 +1,6 @@
Transaction User(TU)事务用户在transaction 层之上的协议层。TU包括了UAC core、UAS core,和proxy core。
tu处理业务逻辑并对事层进行操作。
tu处理业务逻辑并对事层进行操作。
#### 类型

View File

@@ -2,6 +2,7 @@ package tu
import (
"fmt"
"github.com/Monibuca/plugin-gb28181/v3/sip"
"github.com/Monibuca/plugin-gb28181/v3/transaction"
)
@@ -76,17 +77,19 @@ func RunClient() {
}
c := NewClient(config, static)
go c.Start()
go c.StartAndWait()
//TODO先发起注册
//TODO:build sip message
msg := BuildMessageRequest("", "", "", "", "", "",
0, 0, 0, "")
resp := c.SendMessage(msg)
if resp.Code != 0 {
fmt.Println("request failed")
tx := c.MustTX(transaction.GetTXKey(msg))
resp, err := tx.SipRequestForResponse(&sip.Request{Message: msg})
if err != nil {
fmt.Println("request failed, ", err)
}
fmt.Println("response: ", resp.Data)
fmt.Println("response: ", resp.Body)
select {}
}

View File

@@ -43,7 +43,7 @@ func RunServer() {
}
s := NewServer(config)
s.Start()
s.StartAndWait()
select {}
}

View File

@@ -1,39 +1,100 @@
package utils
import (
"bytes"
"encoding/binary"
"errors"
"io"
)
var ErrEOF = errors.New("eof")
type IOBuffer struct {
bytes.Buffer
buf []byte // contents are the bytes buf[off : len(buf)]
off int // read at &buf[off], write at &buf[len(buf)]
}
func (r *IOBuffer) Uint16() (uint16, error) {
if r.Len() > 1 {
return binary.BigEndian.Uint16(r.Next(2)), nil
func (b *IOBuffer) Next(n int) []byte {
m := b.Len()
if n > m {
n = m
}
return 0, ErrEOF
data := b.buf[b.off : b.off+n]
b.off += n
return data
}
func (b *IOBuffer) Uint16() (uint16, error) {
if b.Len() > 1 {
return binary.BigEndian.Uint16(b.Next(2)), nil
}
return 0, io.EOF
}
func (r *IOBuffer) Skip(n int) (err error) {
_, err = r.ReadN(n)
func (b *IOBuffer) Skip(n int) (err error) {
_, err = b.ReadN(n)
return
}
func (r *IOBuffer) Uint32() (uint32, error) {
if r.Len() > 3 {
return binary.BigEndian.Uint32(r.Next(4)), nil
func (b *IOBuffer) Uint32() (uint32, error) {
if b.Len() > 3 {
return binary.BigEndian.Uint32(b.Next(4)), nil
}
return 0, ErrEOF
return 0, io.EOF
}
func (r *IOBuffer) ReadN(length int) ([]byte, error) {
if r.Len() >= length {
return r.Next(length), nil
func (b *IOBuffer) ReadN(length int) ([]byte, error) {
if b.Len() >= length {
return b.Next(length), nil
}
return nil, ErrEOF
return nil, io.EOF
}
// empty reports whether the unread portion of the buffer is empty.
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }
func (b *IOBuffer) ReadByte() (byte, error) {
if b.empty() {
// Buffer is empty, reset to recover space.
b.Reset()
return 0, io.EOF
}
c := b.buf[b.off]
b.off++
return c, nil
}
func (b *IOBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
}
func (b *IOBuffer) Len() int { return len(b.buf) - b.off }
// tryGrowByReslice is a inlineable version of grow for the fast-case where the
// internal buffer only needs to be resliced.
// It returns the index where bytes should be written and whether it succeeded.
func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
if l := len(b.buf); n <= cap(b.buf)-l {
b.buf = b.buf[:l+n]
return l, true
}
return 0, false
}
var ErrTooLarge = errors.New("IOBuffer: too large")
func (b *IOBuffer) Write(p []byte) (n int, err error) {
defer func() {
if recover() != nil {
panic(ErrTooLarge)
}
}()
l := len(p)
oldLen := len(b.buf)
m, ok := b.tryGrowByReslice(l)
if !ok {
buf := make([]byte, oldLen+l)
copy(buf, b.buf[b.off:])
m = oldLen - b.off
b.off = 0
b.buf = buf
}
return copy(b.buf[m:], p), nil
}

150
utils/bufferpool.go Normal file
View File

@@ -0,0 +1,150 @@
package utils
import (
"bytes"
"sort"
"sync"
"sync/atomic"
)
const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
// Pool represents byte buffer pool.
//
// Distinct pools may be used for distinct types of byte buffers.
// Properly determined byte buffer types with their own pools may help reducing
// memory waste.
type Pool struct {
calls [steps]uint64
calibrating uint64
defaultSize uint64
maxSize uint64
pool sync.Pool
}
var defaultPool Pool
// Get returns an empty byte buffer from the pool.
//
// Got byte buffer may be returned to the pool via Put call.
// This reduces the number of memory allocations required for byte buffer
// management.
func Get() *bytes.Buffer { return defaultPool.Get() }
// Get returns new byte buffer with zero length.
//
// The byte buffer may be returned to the pool via Put after the use
// in order to minimize GC overhead.
func (p *Pool) Get() *bytes.Buffer {
v := p.pool.Get()
if v != nil {
return v.(*bytes.Buffer)
}
return bytes.NewBuffer(make([]byte, 0, atomic.LoadUint64(&p.defaultSize)))
}
// Put returns byte buffer to the pool.
//
// bytes.Buffer.B mustn't be touched after returning it to the pool.
// Otherwise data races will occur.
func Put(b *bytes.Buffer) { b.Reset(); defaultPool.Put(b) }
// Put releases byte buffer obtained via Get to the pool.
//
// The buffer mustn't be accessed after returning to the pool.
func (p *Pool) Put(b *bytes.Buffer) {
idx := index(b.Len())
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || b.Cap() <= maxSize {
b.Reset()
p.pool.Put(b)
}
}
func (p *Pool) calibrate() {
if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
return
}
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
sort.Sort(a)
defaultSize := a[0].size
maxSize := defaultSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
type callSize struct {
calls uint64
size uint64
}
type callSizes []callSize
func (ci callSizes) Len() int {
return len(ci)
}
func (ci callSizes) Less(i, j int) bool {
return ci[i].calls > ci[j].calls
}
func (ci callSizes) Swap(i, j int) {
ci[i], ci[j] = ci[j], ci[i]
}
func index(n int) int {
n--
n >>= minBitSize
idx := 0
for n > 0 {
n >>= 1
idx++
}
if idx >= steps {
idx = steps - 1
}
return idx
}

View File

@@ -2,8 +2,8 @@ package utils
import (
"errors"
"github.com/Monibuca/utils/v3"
"github.com/logrusorgru/aurora"
)
//
@@ -140,10 +140,10 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
return dec.ReadN(int(payloadlen))
}
//read the buffer and push video or audio
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
var nextStartCode uint32
again:
dec.clean()
if err := dec.Skip(9); err != nil {
return err
}
@@ -157,20 +157,11 @@ func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
return err
}
var video []byte
var videoTs, videoCts uint32
defer func() {
if video != nil {
pusher.PushVideo(videoTs, videoCts, video)
video = nil
}
if nextStartCode == StartCodePS {
err = dec.Read(ts, pusher)
}
}()
var nextStartCode, videoTs, videoCts uint32
loop:
for err == nil {
nextStartCode, err = dec.Uint32()
if err != nil {
return err
if nextStartCode, err = dec.Uint32(); err != nil {
break
}
switch nextStartCode {
case StartCodeSYS:
@@ -181,7 +172,7 @@ func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
case StartCodeVideo:
var cts uint32
if err = dec.decPESPacket(); err == nil {
if video == nil {
if len(video) == 0 {
if dec.PTS == 0 {
dec.PTS = ts
}
@@ -193,27 +184,33 @@ func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
videoTs = dec.DTS / 90
videoCts = cts / 90
}
video = append(video, dec.Payload...)
video = append (video,dec.Payload...)
} else {
utils.Println("video", err)
}
case StartCodeAudio:
if err = dec.decPESPacket(); err == nil {
var payload []byte
ts := ts / 90
ts := ts / 90
if dec.PTS != 0 {
ts = dec.PTS / 90
}
pusher.PushAudio(ts, append(payload, dec.Payload...))
pusher.PushAudio(ts, dec.Payload)
} else {
utils.Println("audio", err)
}
case StartCodePS:
return nil
break loop
default:
dec.ReadPayload()
}
}
if len(video) > 0 {
pusher.PushVideo(videoTs, videoCts, video)
}
if nextStartCode == StartCodePS {
utils.Println(aurora.Red("StartCodePS recursion..."), err)
goto again
}
return err
}

View File

@@ -1,8 +1,10 @@
package utils
import (
"errors"
"fmt"
"math/rand"
"net"
"runtime"
"time"
)
@@ -45,8 +47,67 @@ func randStringBySoure(src string, n int) string {
return string(output)
}
// Error Error
type Error struct {
err error
params []interface{}
}
func (err *Error) Error() string {
if err == nil {
return "<nil>"
}
str := fmt.Sprint(err.params...)
if err.err != nil {
str += fmt.Sprintf(" err:%s", err.err.Error())
}
return str
}
// NewError NewError
func NewError(err error, params ...interface{}) error {
return &Error{err, params}
}
func PrintStack() {
var buf [4096]byte
n := runtime.Stack(buf[:], false)
fmt.Printf("==> %s\n", string(buf[:n]))
}
// ResolveSelfIP ResolveSelfIP
func ResolveSelfIP() (net.IP, error) {
ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 {
continue // interface down
}
if iface.Flags&net.FlagLoopback != 0 {
continue // loopback interface
}
addrs, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue // not an ipv4 address
}
return ip, nil
}
}
return nil, errors.New("server not connected to any network")
}