Compare commits

...

4 Commits

Author SHA1 Message Date
dexter
f3046bcde3 将ps发布逻辑移入引擎中 2023-02-28 20:13:19 +08:00
charlestamz
9c970ad282 注释暂不处理级联目录信息 2023-02-27 14:25:41 +08:00
charlestamz
c4de92e9f6 修复MediaPortMin无法使用的问题 2023-02-22 23:44:44 +08:00
charlestamz
cf5a803971 优化Invite,代码优化 2023-02-22 23:13:08 +08:00
9 changed files with 251 additions and 232 deletions

View File

@@ -1,9 +1,7 @@
package gb28181
import (
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
@@ -155,64 +153,8 @@ func (channel *Channel) Control(PTZCmd string) int {
return int(resp.StatusCode())
}
type InviteOptions struct {
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
}
func (o InviteOptions) IsLive() bool {
return o.Start == 0 || o.End == 0
}
func (o InviteOptions) Record() bool {
return !o.IsLive()
}
func (o *InviteOptions) Validate(start, end string) error {
if start != "" {
sint, err1 := strconv.ParseInt(start, 10, 0)
if err1 != nil {
return err1
}
o.Start = int(sint)
}
if end != "" {
eint, err2 := strconv.ParseInt(end, 10, 0)
if err2 != nil {
return err2
}
o.End = int(eint)
}
if o.Start >= o.End {
return errors.New("start < end")
}
return nil
}
func (o InviteOptions) String() string {
return fmt.Sprintf("t=%d %d", o.Start, o.End)
}
func (o *InviteOptions) CreateSSRC() {
ssrc := make([]byte, 10)
if o.IsLive() {
ssrc[0] = '0'
} else {
ssrc[0] = '1'
}
copy(ssrc[1:6], conf.Serial[3:8])
randNum := 1000 + rand.Intn(8999)
copy(ssrc[6:], strconv.Itoa(randNum))
o.ssrc = string(ssrc)
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
o.SSRC = uint32(_ssrc)
}
//Invite 发送Invite报文注意里面的锁保证不同时发送invite报文该锁由channel持有
// Invite 发送Invite报文 invites a channel to play
// 注意里面的锁保证不同时发送invite报文该锁由channel持有
/***
f字段 f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
各项具体含义:
@@ -257,13 +199,13 @@ f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
if opt.IsLive() {
if !channel.liveInviteLock.TryLock() {
return 304, nil
}
defer func() {
if code != 200 {
if code != OK {
channel.liveInviteLock.Unlock()
}
}()
@@ -280,10 +222,6 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
if opt.dump == "" {
opt.dump = conf.DumpPath
}
// size := 1
// fps := 15
// bitrate := 200
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
@@ -294,32 +232,24 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
if conf.tcpPorts.Valid {
opt.MediaPort, err = publisher.ListenTCP()
if err != nil {
return 500, err
return ServerInternalError, err
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
publisher.DisableReorder = true
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = publisher.ListenUDP()
if err != nil {
code = 500
code = ServerInternalError
return
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
}
// if opt.MediaPort == 0 {
// opt.MediaPort = conf.MediaPort
// if conf.IsMediaNetworkTCP() {
// protocol = "TCP/"
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
// channel.tcpPortIndex = 0
// }
// }
// }
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
@@ -353,7 +283,8 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
}
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
if code == 200 {
if code == OK {
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
@@ -371,14 +302,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
publisher.udpCache = utils.NewPqRtp()
}
if err = plugin.Publish(streamPath, publisher); err != nil {
code = 403
code = ServerInternalError
return
}
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
srv.Send(ack)
} else if opt.IsLive() && conf.AutoInvite {
} else if channel.CanInvite() {
time.AfterFunc(time.Second*5, func() {
channel.Invite(InviteOptions{})
channel.TryAutoInvite()
})
}
return
@@ -398,3 +329,38 @@ func (channel *Channel) Bye(live bool) int {
}
return 404
}
func (channel *Channel) TryAutoInvite() {
if conf.AutoInvite && channel.CanInvite() {
go channel.Invite(&InviteOptions{})
}
}
func (channel *Channel) CanInvite() bool {
if channel.LivePublisher != nil || len(channel.DeviceID) != 20 || channel.Status == "OFF" {
return false
}
if conf.InviteIDs == "" {
return true
}
// 1113位是设备类型编码
typeID := channel.DeviceID[10:13]
// format: start-end,type1,type2
tokens := strings.Split(conf.InviteIDs, ",")
for _, tok := range tokens {
if first, second, ok := strings.Cut(tok, "-"); ok {
if typeID >= first && typeID <= second {
return true
}
} else {
if typeID == first {
return true
}
}
}
return false
}

115
const.go Normal file
View File

@@ -0,0 +1,115 @@
package gb28181
const (
Trying = 100
Ringing = 180
CallIsBeingForwarded = 181
Queued = 182
SessionProgress = 183
OK = 200
Accepted = 202
MultipleChoices = 300
MovedPermanently = 301
MovedTemporarily = 302
UseProxy = 305
AlternativeService = 380
BadRequest = 400
Unauthorized = 401
PaymentRequired = 402
Forbidden = 403
NotFound = 404
MethodNotAllowed = 405
NotAcceptable = 406
ProxyAuthenticationRequired = 407
RequestTimeout = 408
Gone = 410
RequestEntityTooLarge = 413
RequestURITooLong = 414
UnsupportedMediaType = 415
UnsupportedURIScheme = 416
BadExtension = 420
ExtensionRequired = 421
IntervalTooBrief = 423
TemporarilyUnavailable = 480
CallTransactionDoesNotExist = 481
LoopDetected = 482
TooManyHops = 483
AddressIncomplete = 484
Ambiguous = 485
BusyHere = 486
RequestTerminated = 487
NotAcceptableHere = 488
BadEvent = 489
RequestPending = 491
Undecipherable = 493
ServerInternalError = 500
NotImplemented = 501
BadGateway = 502
ServiceUnavailable = 503
ServerTim = 504
VersionNotSupported = 505
MessageTooLarge = 513
BusyEverywhere = 600
Decline = 603
DoesNotExistAnywhere = 604
SessionNotAcceptable = 606
)
var reasons = map[int]string{
100: "Trying",
180: "Ringing",
181: "Call Is Being Forwarded",
182: "Queued",
183: "Session Progress",
200: "OK",
202: "Accepted",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Moved Temporarily",
305: "Use Proxy",
380: "Alternative Service",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Timeout",
410: "Gone",
413: "Request Entity Too Large",
414: "Request-URI Too Long",
415: "Unsupported Media Type",
416: "Unsupported URI Scheme",
420: "Bad Extension",
421: "Extension Required",
423: "Interval Too Brief",
480: "Temporarily Unavailable",
481: "Call transaction Does Not Exist",
482: "Loop Detected",
483: "Too Many Hops",
484: "Address Incomplete",
485: "Ambiguous",
486: "Busy Here",
487: "Request Terminated",
488: "Not Acceptable Here",
489: "Bad Event",
491: "Request Pending",
493: "Undecipherable",
500: "Server Internal Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Server Tim",
505: "Version Not Supported",
513: "message Too Large",
600: "Busy Everywhere",
603: "Decline",
604: "Does Not Exist Anywhere",
606: "SESSION NOT ACCEPTABLE",
}
func Explain(statusCode int) string {
return reasons[statusCode]
}

View File

@@ -241,12 +241,14 @@ func (d *Device) UpdateChannels(list []*Channel) {
if c.ParentID != "" {
path := strings.Split(c.ParentID, "/")
parentId := path[len(path)-1]
if c.DeviceID != parentId {
if v, ok := Devices.Load(parentId); ok {
parent := v.(*Device)
parent.addOrUpdateChannel(c)
continue
}
//如果父ID并非本身所属设备一般情况下这是因为下级设备上传了目录信息该信息通常不需要处理。
// 暂时不考虑级联目录的实现
if d.ID != parentId {
//if v, ok := Devices.Load(parentId); ok {
// parent := v.(*Device)
// parent.addOrUpdateChannel(c)
continue
//}
}
}
//本设备增加通道
@@ -261,7 +263,7 @@ func (d *Device) UpdateChannels(list []*Channel) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
conf.TryAutoInvite(c)
c.TryAutoInvite()
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c.LiveSubSP = s.Path
} else {
@@ -359,7 +361,7 @@ func (d *Device) Subscribe() int {
response, err := d.SipRequestForResponse(request)
if err == nil && response != nil {
if response.StatusCode() == 200 {
if response.StatusCode() == OK {
callId, _ := request.CallID()
d.subscriber.CallID = string(*callId)
} else {
@@ -407,7 +409,7 @@ func (d *Device) QueryDeviceInfo() {
// d.SipIP = received.String()
// }
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
if response.StatusCode() == 200 {
if response.StatusCode() == OK {
break
}
}
@@ -435,7 +437,7 @@ func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, inter
response, err := d.SipRequestForResponse(mobilePosition)
if err == nil && response != nil {
if response.StatusCode() == 200 {
if response.StatusCode() == OK {
callId, _ := mobilePosition.CallID()
d.subscriber.CallID = callId.String()
} else {

View File

@@ -185,7 +185,7 @@ func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
go d.syncChannels()
} else {
for _, ch := range d.channelMap {
c.TryAutoInvite(ch)
ch.TryAutoInvite()
}
}
//为什么要查找子码流?

65
inviteoption.go Normal file
View File

@@ -0,0 +1,65 @@
package gb28181
import (
"errors"
"fmt"
"math/rand"
"strconv"
)
type InviteOptions struct {
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
}
func (o InviteOptions) IsLive() bool {
return o.Start == 0 || o.End == 0
}
func (o InviteOptions) Record() bool {
return !o.IsLive()
}
func (o *InviteOptions) Validate(start, end string) error {
if start != "" {
sint, err1 := strconv.ParseInt(start, 10, 0)
if err1 != nil {
return err1
}
o.Start = int(sint)
}
if end != "" {
eint, err2 := strconv.ParseInt(end, 10, 0)
if err2 != nil {
return err2
}
o.End = int(eint)
}
if o.Start >= o.End {
return errors.New("start < end")
}
return nil
}
func (o InviteOptions) String() string {
return fmt.Sprintf("t=%d %d", o.Start, o.End)
}
func (o *InviteOptions) CreateSSRC() {
ssrc := make([]byte, 10)
if o.IsLive() {
ssrc[0] = '0'
} else {
ssrc[0] = '1'
}
copy(ssrc[1:6], conf.Serial[3:8])
randNum := 1000 + rand.Intn(8999)
copy(ssrc[6:], strconv.Itoa(randNum))
o.ssrc = string(ssrc)
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
o.SSRC = uint32(_ssrc)
}

37
main.go
View File

@@ -19,7 +19,7 @@ type GB28181PositionConfig struct {
type GB28181Config struct {
AutoInvite bool `default:"true"`
PreFetchRecord bool
InviteIDs string
InviteIDs string //按照国标gb28181协议允许邀请的设备类型:132 摄像机 NVR
//sip服务器的配置
SipNetwork string `default:"udp"` //传输协议默认UDP可选TCP
@@ -81,41 +81,6 @@ func (c *GB28181Config) IsMediaNetworkTCP() bool {
return strings.ToLower(c.MediaNetwork) == "tcp"
}
func (c *GB28181Config) TryAutoInvite(ch *Channel) {
if c.AutoInvite && (ch.LivePublisher == nil) && c.CanInvite(ch.DeviceID) {
go ch.Invite(InviteOptions{})
}
}
func (c *GB28181Config) CanInvite(deviceID string) bool {
if len(deviceID) != 20 {
return false
}
if c.InviteIDs == "" {
return true
}
// 1113位是设备类型编码
typeID := deviceID[10:13]
// format: start-end,type1,type2
tokens := strings.Split(c.InviteIDs, ",")
for _, tok := range tokens {
if first, second, ok := strings.Cut(tok, "-"); ok {
if typeID >= first && typeID <= second {
return true
}
} else {
if typeID == first {
return true
}
}
}
return false
}
var conf GB28181Config
var plugin = InstallPlugin(&conf)

View File

@@ -12,26 +12,19 @@ import (
"github.com/pion/rtp/v2"
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegps"
"m7s.live/engine/v4/codec/mpegts"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
)
type GBPublisher struct {
Publisher
InviteOptions
PSPublisher
*InviteOptions
channel *Channel
inviteRes sip.Response
parser mpegps.MpegPsStream
lastSeq uint16
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
reorder util.RTPReorder[*rtp.Packet]
}
func (p *GBPublisher) PrintDump(s string) {
@@ -69,7 +62,7 @@ func (p *GBPublisher) OnEvent(event any) {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(InviteOptions{})
go p.channel.Invite(&InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
@@ -106,100 +99,11 @@ func (p *GBPublisher) Bye() int {
resp, err := p.channel.device.SipRequestForResponse(bye)
if err != nil {
p.Error("Bye", zap.Error(err))
return 500
return ServerInternalError
}
return int(resp.StatusCode())
}
func (p *GBPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if p.VideoTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream)
case mpegts.STREAM_TYPE_H265:
p.VideoTrack = NewH265(p.Publisher.Stream)
default:
//推测编码类型
var maybe264 H264NALUType
maybe264 = maybe264.Parse(es.Buffer[4])
switch maybe264 {
case NALU_Non_IDR_Picture,
NALU_IDR_Picture,
NALU_SEI,
NALU_SPS,
NALU_PPS,
NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
p.VideoTrack = NewH265(p.Publisher.Stream)
}
}
}
payload, pts, dts := es.Buffer, es.PTS, es.DTS
if len(payload) > 10 {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
} else {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
}
if dts == 0 {
dts = pts
}
// if binary.BigEndian.Uint32(payload) != 1 {
// panic("not annexb")
// }
p.WriteAnnexB(pts, dts, payload)
}
func (p *GBPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
ts, payload := es.PTS, es.Buffer
if p.AudioTrack == nil {
switch es.Type {
case mpegts.STREAM_TYPE_G711A:
p.AudioTrack = NewG711(p.Publisher.Stream, true)
case mpegts.STREAM_TYPE_G711U:
p.AudioTrack = NewG711(p.Publisher.Stream, false)
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(ts, payload)
case 0: //推测编码类型
if payload[0] == 0xff && payload[1]>>4 == 0xf {
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(ts, payload)
}
default:
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
}
} else if es.Type == mpegts.STREAM_TYPE_AAC {
p.WriteADTS(ts, payload)
} else {
p.WriteRaw(ts, payload)
}
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
if p.parser.EsHandler == nil {
p.parser.EsHandler = p
p.lastSeq = rtp.SequenceNumber - 1
}
if conf.IsMediaNetworkTCP() {
p.parser.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber
} else {
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
if rtp.SequenceNumber != p.lastSeq+1 {
fmt.Println("drop", rtp.SequenceNumber, p.lastSeq)
p.parser.Drop()
if p.VideoTrack != nil {
p.SetLostFlag()
}
}
p.parser.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber
}
}
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()

View File

@@ -62,7 +62,7 @@ func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
} else if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else if code, err := c.Invite(opt); err == nil {
} else if code, err := c.Invite(&opt); err == nil {
w.WriteHeader(code)
} else {
http.Error(w, err.Error(), code)

View File

@@ -31,7 +31,7 @@ type PortManager struct {
}
func (pm *PortManager) Init(start, end uint16) {
pm.pos = start
pm.pos = start - 1
pm.max = end
if pm.pos > 0 && pm.max > pm.pos {
pm.Valid = true
@@ -146,11 +146,13 @@ func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
lenBuf := make([]byte, 2)
defer conn.Close()
var err error
ps := make(util.Buffer, 1024)
for err == nil {
if _, err = io.ReadFull(reader, lenBuf); err != nil {
return
}
ps := make([]byte, binary.BigEndian.Uint16(lenBuf))
ps.Reset()
ps.Glow(int(binary.BigEndian.Uint16(lenBuf)))
if _, err = io.ReadFull(reader, ps); err != nil {
return
}