Compare commits

..

5 Commits

Author SHA1 Message Date
dexter
3a704b68cc 采用引擎处理ps 2023-02-20 00:20:20 +08:00
charlestamz
c8f51a7ec5 修复:gosip不支持go标准的监听格式 2023-02-19 23:57:48 +08:00
charlestamz
b7bad99292 优化,消除一些低级错误 2023-02-19 23:45:36 +08:00
charlestamz
7b6b827899 修复多次发送Invite和Catalog得问题 2023-02-19 15:25:18 +08:00
charlestamz
d121927c96 优化GB插件,修复一些问题 2023-02-17 23:19:02 +08:00
9 changed files with 212 additions and 205 deletions

View File

@@ -26,7 +26,7 @@ type ChannelEx struct {
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
liveInviteLock sync.Mutex
liveInviteLock *sync.Mutex
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
@@ -50,33 +50,11 @@ type Channel struct {
Secrecy int
Status string
Children []*Channel `json:"-"`
*ChannelEx //自定义属性
ChannelEx //自定义属性
}
func (c *Channel) Copy(v *Channel) {
if v == nil {
return
}
c.DeviceID = v.DeviceID
c.ParentID = v.ParentID
c.Name = v.Name
c.Manufacturer = v.Manufacturer
c.Model = v.Model
c.Owner = v.Owner
c.CivilCode = v.CivilCode
c.Address = v.Address
c.Parental = v.Parental
c.SafetyWay = v.SafetyWay
c.RegisterWay = v.RegisterWay
c.Secrecy = v.Secrecy
c.Status = v.Status
c.Status = v.Status
c.ChannelEx = v.ChannelEx
}
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := c.device
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := channel.device
d.sn++
callId := sip.CallID(utils.RandNumString(10))
@@ -98,15 +76,15 @@ func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
}
//非同一域的目标地址需要使用@host
host := conf.Realm
if c.DeviceID[0:9] != host {
if channel.DeviceID[0:9] != host {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, c.Port)
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
}
channelAddr := sip.Address{
//DisplayName: sip.String{Str: d.serverConfig.Serial},
Uri: &sip.SipUri{FUser: sip.String{Str: c.DeviceID}, FHost: host},
Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host},
}
req = sip.NewRequest(
"",
@@ -234,7 +212,8 @@ func (o *InviteOptions) CreateSSRC() {
o.SSRC = uint32(_ssrc)
}
/*
//Invite 发送Invite报文注意里面的锁保证不同时发送invite报文该锁由channel持有
/***
f字段 f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
各项具体含义:
v后续参数为视频的参数各参数间以 “/”分割;
@@ -369,7 +348,8 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
invite.AppendHeader(&subject)
publisher.inviteRes, err = d.SipRequestForResponse(invite)
if err != nil {
return http.StatusRequestTimeout, err
plugin.Error(fmt.Sprintf("SIP->Invite %s :%s invite error: %s", channel.DeviceID, invite.String(), err.Error()))
return http.StatusInternalServerError, err
}
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))

141
device.go
View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"golang.org/x/exp/maps"
"net/http"
"os"
"strings"
@@ -55,7 +56,6 @@ type Device struct {
UpdateTime time.Time
LastKeepaliveAt time.Time
Status string
Channels []*Channel
sn int
addr sip.Address
sipIP string //设备对应网卡的服务器ip
@@ -67,12 +67,23 @@ type Device struct {
CallID string
Timeout time.Time
}
GpsTime time.Time //gps时间
Longitude string //经度
Latitude string //
lastSyncTime time.Time
GpsTime time.Time //gps时间
Longitude string //
Latitude string //纬度
}
func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
func (d *Device) MarshalJSON() ([]byte, error) {
type Alias Device
return json.Marshal(&struct {
Channels []*Channel
*Alias
}{
Channels: maps.Values(d.channelMap),
Alias: (*Alias)(d),
})
}
func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
from, _ := req.From()
d.addr = sip.Address{
DisplayName: from.DisplayName,
@@ -81,7 +92,7 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
deviceIp := req.Source()
servIp := req.Recipient().Host()
//根据网卡ip获取对应的公网ip
sipIP := config.routes[servIp]
sipIP := c.routes[servIp]
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
@@ -89,14 +100,14 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
}
}
//如果用户配置过则使用配置的
if config.SipIP != "" {
sipIP = config.SipIP
if c.SipIP != "" {
sipIP = c.SipIP
} else if sipIP == "" {
sipIP = myip.InternalIPv4()
}
mediaIp := sipIP
if config.MediaIP != "" {
mediaIp = config.MediaIP
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Status = string(sip.REGISTER)
@@ -105,10 +116,9 @@ func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
d.NetAddr = deviceIp
d.UpdateTime = time.Now()
d.channelMap = make(map[string]*Channel)
go d.Catalog()
}
func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
var d *Device
func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
from, _ := req.From()
deviceAddr := sip.Address{
DisplayName: from.DisplayName,
@@ -124,7 +134,7 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
} else {
servIp := req.Recipient().Host()
//根据网卡ip获取对应的公网ip
sipIP := config.routes[servIp]
sipIP := c.routes[servIp]
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
@@ -132,14 +142,14 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
}
}
//如果用户配置过则使用配置的
if config.SipIP != "" {
sipIP = config.SipIP
if c.SipIP != "" {
sipIP = c.SipIP
} else if sipIP == "" {
sipIP = myip.InternalIPv4()
}
mediaIp := sipIP
if config.MediaIP != "" {
mediaIp = config.MediaIP
if c.MediaIP != "" {
mediaIp = c.MediaIP
}
plugin.Info("StoreDevice", zap.String("id", id), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d = &Device{
@@ -154,11 +164,11 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
channelMap: make(map[string]*Channel),
}
Devices.Store(id, d)
SaveDevices()
go d.Catalog()
c.SaveDevices()
}
return
}
func ReadDevices() {
func (c *GB28181Config) ReadDevices() {
if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
defer f.Close()
var items []*Device
@@ -172,7 +182,7 @@ func ReadDevices() {
}
}
}
func SaveDevices() {
func (c *GB28181Config) SaveDevices() {
var item []any
Devices.Range(func(key, value any) bool {
item = append(item, value)
@@ -186,19 +196,33 @@ func SaveDevices() {
}
}
func (d *Device) addChannel(channel *Channel) {
for _, c := range d.Channels {
if c.DeviceID == channel.DeviceID {
return
}
func (d *Device) addOrUpdateChannel(channel *Channel) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
channel.device = d
var oldLock *sync.Mutex
if old, ok := d.channelMap[channel.DeviceID]; ok {
//复制锁指针
oldLock = old.liveInviteLock
}
d.Channels = append(d.Channels, channel)
if oldLock == nil {
channel.liveInviteLock = &sync.Mutex{}
} else {
channel.liveInviteLock = oldLock
}
d.channelMap[channel.DeviceID] = channel
}
func (d *Device) deleteChannel(DeviceID string) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
delete(d.channelMap, DeviceID)
}
func (d *Device) CheckSubStream() {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
for _, c := range d.Channels {
for _, c := range d.channelMap {
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c.LiveSubSP = s.Path
} else {
@@ -207,42 +231,34 @@ func (d *Device) CheckSubStream() {
}
}
func (d *Device) UpdateChannels(list []*Channel) {
d.channelMutex.Lock()
defer d.channelMutex.Unlock()
for _, c := range list {
if _, ok := conf.Ignores[c.DeviceID]; ok {
continue
}
//当父设备非空且存在时、父设备节点增加通道
if c.ParentID != "" {
path := strings.Split(c.ParentID, "/")
parentId := path[len(path)-1]
if parent, ok := d.channelMap[parentId]; ok {
if c.DeviceID != parentId {
parent.Children = append(parent.Children, c)
if c.DeviceID != parentId {
if v, ok := Devices.Load(parentId); ok {
parent := v.(*Device)
parent.addOrUpdateChannel(c)
continue
}
} else {
d.addChannel(c)
}
} else {
d.addChannel(c)
}
if old, ok := d.channelMap[c.DeviceID]; ok {
c.ChannelEx = old.ChannelEx
if conf.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
//本设备增加通道
d.addOrUpdateChannel(c)
//预取和邀请
if conf.PreFetchRecord {
n := time.Now()
n = time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.Local)
if len(c.Records) == 0 || (n.Format(TIME_LAYOUT) == c.RecordStartTime &&
n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT) == c.RecordEndTime) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
old.Copy(c)
c = old
} else {
c.ChannelEx = &ChannelEx{
device: d,
}
d.channelMap[c.DeviceID] = c
}
if conf.AutoInvite && (c.LivePublisher == nil) {
go c.Invite(InviteOptions{})
@@ -356,6 +372,7 @@ func (d *Device) Subscribe() int {
}
func (d *Device) Catalog() int {
//os.Stdout.Write(debug.Stack())
request := d.CreateRequest(sip.MESSAGE)
expires := sip.Expires(3600)
d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
@@ -373,10 +390,9 @@ func (d *Device) Catalog() int {
return http.StatusRequestTimeout
}
func (d *Device) QueryDeviceInfo(req *sip.Request) {
func (d *Device) QueryDeviceInfo() {
for i := time.Duration(5); i < 100; i++ {
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s", d.ID, d.NetAddr))
time.Sleep(time.Second * i)
request := d.CreateRequest(sip.MESSAGE)
contentType := sip.ContentType("Application/MANSCDP+xml")
@@ -391,10 +407,8 @@ func (d *Device) QueryDeviceInfo(req *sip.Request) {
// received, _ := via.Params.Get("received")
// d.SipIP = received.String()
// }
if response.StatusCode() != 200 {
plugin.Sugar().Errorf("device %s send Catalog : %d\n", d.ID, response.StatusCode())
} else {
d.Subscribe()
plugin.Info(fmt.Sprintf("QueryDeviceInfo:%s ipaddr:%s response code:%d", d.ID, d.NetAddr, response.StatusCode()))
if response.StatusCode() == 200 {
break
}
}
@@ -467,7 +481,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := &Channel{
channel := Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -483,12 +497,11 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
channels := []*Channel{channel}
d.UpdateChannels(channels)
d.addOrUpdateChannel(&channel)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
d.channelOffline(v.DeviceID)
d.deleteChannel(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
// 更新通道

View File

@@ -52,15 +52,14 @@ func (a *Authorization) getDigest(raw string) string {
}
}
func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
id := from.Address.User().String()
plugin.Debug(id)
plugin.Sugar().Debugf("OnRegister: %s, %s from %s ", req.Destination(), id, req.Source())
passAuth := false
// 不需要密码情况
if config.Username == "" && config.Password == "" {
if c.Username == "" && c.Password == "" {
passAuth = true
} else {
// 需要密码情况 设备第一次上报返回401和加密算法
@@ -73,7 +72,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
if auth.Username() == id {
username = id
} else {
username = config.Username
username = c.Username
}
if dc, ok := DeviceRegisterCount.LoadOrStore(id, 1); ok && dc.(int) > MaxRegisterCount {
@@ -83,7 +82,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
} else {
// 设备第二次上报,校验
_nonce, loaded := DeviceNonce.Load(id)
if loaded && auth.Verify(username, config.Password, config.Realm, _nonce.(string)) {
if loaded && auth.Verify(username, c.Password, c.Realm, _nonce.(string)) {
passAuth = true
} else {
DeviceRegisterCount.Store(id, dc.(int)+1)
@@ -92,7 +91,13 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
}
}
if passAuth {
config.StoreDevice(id, req)
var d *Device
if v, ok := Devices.Load(id); ok {
d = v.(*Device)
c.RecoverDevice(d, req)
} else {
d = c.StoreDevice(id, req)
}
DeviceNonce.Delete(id)
DeviceRegisterCount.Delete(id)
resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
@@ -106,12 +111,14 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
Contents: time.Now().Format(TIME_LAYOUT),
})
_ = tx.Respond(resp)
//订阅设备更新
go d.syncChannels()
} else {
response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
_nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
auth := fmt.Sprintf(
`Digest realm="%s",algorithm=%s,nonce="%s"`,
config.Realm,
c.Realm,
"MD5",
_nonce.(string),
)
@@ -122,18 +129,31 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
_ = tx.Respond(response)
}
}
func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
// syncChannels
// 同步设备信息、下属通道信息,包括主动查询通道信息,订阅通道变化情况
func (d *Device) syncChannels() {
if time.Since(d.lastSyncTime) > 2*conf.HeartbeatInterval {
d.lastSyncTime = time.Now()
d.QueryDeviceInfo()
d.Catalog()
d.Subscribe()
}
}
func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
id := from.Address.User().String()
plugin.Sugar().Debugf("SIP<-OnMessage from %s : %s", req.Source(), req.String())
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
switch d.Status {
case "RECOVER":
config.RecoverDevice(d, req)
return
c.RecoverDevice(d, req)
go d.syncChannels()
//return
case string(sip.REGISTER):
d.Status = "ONLINE"
//go d.QueryDeviceInfo(req)
}
d.UpdateTime = time.Now()
temp := &struct {
@@ -161,25 +181,20 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
case "Keepalive":
d.LastKeepaliveAt = time.Now()
//callID !="" 说明是订阅的事件类型信息
if d.Channels == nil {
go d.Catalog()
if d.channelMap == nil || len(d.channelMap) == 0 {
go d.syncChannels()
} else {
if d.subscriber.CallID != "" && d.LastKeepaliveAt.After(d.subscriber.Timeout) {
go d.Catalog()
} else {
for _, c := range d.Channels {
if config.AutoInvite &&
(c.LivePublisher == nil) {
c.Invite(InviteOptions{})
}
for _, ch := range d.channelMap {
if c.AutoInvite && (ch.LivePublisher == nil) {
ch.Invite(InviteOptions{})
}
}
}
d.CheckSubStream()
//为什么要查找子码流?
//d.CheckSubStream()
//在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
if config.Position.AutosubPosition && time.Since(d.GpsTime) > config.Position.Interval*2 {
d.MobilePositionSubscribe(d.ID, config.Position.Expires, config.Position.Interval)
if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
plugin.Sugar().Debugf("位置自动订阅,设备[%s]成功\n", d.ID)
}
case "Catalog":
@@ -204,12 +219,12 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
}
}
func (config *GB28181Config) onBye(req sip.Request, tx sip.ServerTransaction) {
func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", ""))
}
// OnNotify 订阅通知处理
func (config *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
from, _ := req.From()
id := from.Address.User().String()
if v, ok := Devices.Load(id); ok {

View File

@@ -32,7 +32,7 @@ type GB28181Config struct {
// AckTimeout uint16 //sip 服务应答超时,单位秒
RegisterValidity time.Duration //注册有效期,单位秒,默认 3600
// RegisterInterval int //注册间隔,单位秒,默认 60
// HeartbeatInterval int //心跳间隔,单位秒,默认 60
HeartbeatInterval time.Duration //心跳间隔,单位秒,默认 60
// HeartbeatRetry int //心跳超时次数,默认 3
//媒体服务器配置
@@ -70,7 +70,7 @@ func (c *GB28181Config) initRoutes() {
func (c *GB28181Config) OnEvent(event any) {
switch event.(type) {
case FirstConfig:
ReadDevices()
c.ReadDevices()
go c.initRoutes()
c.startServer()
}
@@ -95,7 +95,7 @@ var conf = &GB28181Config{
// AckTimeout: 10,
RegisterValidity: 60 * time.Second,
// RegisterInterval: 60,
// HeartbeatInterval: 60,
HeartbeatInterval: 60 * time.Second,
// HeartbeatRetry: 3,
MediaIP: "",

View File

@@ -13,6 +13,7 @@ import (
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegps"
"m7s.live/engine/v4/codec/mpegts"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
@@ -24,7 +25,7 @@ type GBPublisher struct {
InviteOptions
channel *Channel
inviteRes sip.Response
parser *utils.DecPSPackage
parser mpegps.MpegPsStream
lastSeq uint16
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
@@ -41,6 +42,7 @@ func (p *GBPublisher) PrintDump(s string) {
func (p *GBPublisher) OnEvent(event any) {
if p.channel == nil {
p.parser.EsHandler = p
p.IO.OnEvent(event)
return
}
@@ -53,6 +55,7 @@ func (p *GBPublisher) OnEvent(event any) {
p.Type = "GB28181 Playback"
p.channel.RecordPublisher = p
}
p.parser.EsHandler = p
conf.publishers.Add(p.SSRC, p)
if err := error(nil); p.dump != "" {
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
@@ -108,9 +111,9 @@ func (p *GBPublisher) Bye() int {
return int(resp.StatusCode())
}
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
func (p *GBPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
if p.VideoTrack == nil {
switch p.parser.VideoStreamType {
switch es.Type {
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream)
case mpegts.STREAM_TYPE_H265:
@@ -118,7 +121,7 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
default:
//推测编码类型
var maybe264 H264NALUType
maybe264 = maybe264.Parse(payload[4])
maybe264 = maybe264.Parse(es.Buffer[4])
switch maybe264 {
case NALU_Non_IDR_Picture,
NALU_IDR_Picture,
@@ -133,6 +136,7 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
}
}
}
payload, pts, dts := es.Buffer, es.PTS, es.DTS
if len(payload) > 10 {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
} else {
@@ -146,25 +150,26 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
// }
p.WriteAnnexB(pts, dts, payload)
}
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
func (p *GBPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
ts, payload := es.PTS, es.Buffer
if p.AudioTrack == nil {
switch p.parser.AudioStreamType {
switch es.Type {
case mpegts.STREAM_TYPE_G711A:
p.AudioTrack = NewG711(p.Publisher.Stream, true, uint32(90000))
p.AudioTrack = NewG711(p.Publisher.Stream, true)
case mpegts.STREAM_TYPE_G711U:
p.AudioTrack = NewG711(p.Publisher.Stream, false, uint32(90000))
p.AudioTrack = NewG711(p.Publisher.Stream, false)
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream, uint32(90000))
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(ts, payload)
case 0: //推测编码类型
if payload[0] == 0xff && payload[1]>>4 == 0xf {
p.AudioTrack = NewAAC(p.Publisher.Stream, uint32(90000))
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(ts, payload)
}
default:
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
}
} else if p.parser.AudioStreamType == mpegts.STREAM_TYPE_AAC {
} else if es.Type == mpegts.STREAM_TYPE_AAC {
p.WriteADTS(ts, payload)
} else {
p.WriteRaw(ts, payload)
@@ -173,11 +178,8 @@ func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
if p.parser == nil {
p.parser = utils.NewDecPSPackage(p)
}
if conf.IsMediaNetworkTCP() {
p.parser.Feed(rtp)
p.parser.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber
} else {
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
@@ -188,7 +190,7 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
p.SetLostFlag()
}
}
p.parser.Feed(rtp)
p.parser.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber
}
}

View File

@@ -10,11 +10,11 @@ import (
"m7s.live/engine/v4/util"
)
func (conf *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
util.ReturnJson(func() (list []*Device) {
Devices.Range(func(key, value interface{}) bool {
device := value.(*Device)
if time.Since(device.UpdateTime) > conf.RegisterValidity {
if time.Since(device.UpdateTime) > c.RegisterValidity {
Devices.Delete(key)
} else {
list = append(list, device)
@@ -25,7 +25,7 @@ func (conf *GB28181Config) API_list(w http.ResponseWriter, r *http.Request) {
}, time.Second*5, w, r)
}
func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
startTime := r.URL.Query().Get("startTime")
@@ -37,7 +37,7 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
}
}
func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
ptzcmd := r.URL.Query().Get("ptzcmd")
@@ -48,7 +48,7 @@ func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
}
}
func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
id := query.Get("id")
channel := query.Get("channel")
@@ -69,11 +69,11 @@ func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
}
}
func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
dump := r.URL.Query().Get("dump")
printOut := r.URL.Query().Get("print")
if dump == "" {
dump = conf.DumpPath
dump = c.DumpPath
}
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
if err != nil {
@@ -102,7 +102,7 @@ func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
}
}
func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
// CORS(w, r)
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
@@ -114,7 +114,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
}
}
func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_position(w http.ResponseWriter, r *http.Request) {
//CORS(w, r)
query := r.URL.Query()
//设备id
@@ -126,11 +126,11 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
expiresInt, err := time.ParseDuration(expires)
if expires == "" || err != nil {
expiresInt = conf.Position.Expires
expiresInt = c.Position.Expires
}
intervalInt, err := time.ParseDuration(interval)
if interval == "" || err != nil {
intervalInt = conf.Position.Interval
intervalInt = c.Position.Interval
}
if v, ok := Devices.Load(id); ok {
@@ -148,7 +148,7 @@ type DevicePosition struct {
Latitude string //纬度
}
func (conf *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Request) {
func (c *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
//设备id
id := query.Get("id")
@@ -157,7 +157,7 @@ func (conf *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Reque
if id == "" {
Devices.Range(func(key, value interface{}) bool {
d := value.(*Device)
if time.Since(d.GpsTime) <= conf.Position.Interval {
if time.Since(d.GpsTime) <= c.Position.Interval {
list = append(list, &DevicePosition{ID: d.ID, GpsTime: d.GpsTime, Longitude: d.Longitude, Latitude: d.Latitude})
}
return true
@@ -167,5 +167,5 @@ func (conf *GB28181Config) API_get_position(w http.ResponseWriter, r *http.Reque
list = append(list, &DevicePosition{ID: d.ID, GpsTime: d.GpsTime, Longitude: d.Longitude, Latitude: d.Latitude})
}
return
}, conf.Position.Interval, w, r)
}, c.Position.Interval, w, r)
}

View File

@@ -96,51 +96,51 @@ var levelMap = map[string]log.Level{
"panic": log.PanicLevel,
}
func (config *GB28181Config) startServer() {
config.publishers.Init()
addr := "0.0.0.0:" + strconv.Itoa(int(config.SipPort))
func (c *GB28181Config) startServer() {
c.publishers.Init()
addr := "0.0.0.0:" + strconv.Itoa(int(c.SipPort))
logger := utils.NewZapLogger(plugin.Logger, "GB SIP Server", nil)
logger.SetLevel(levelMap[config.LogLevel])
logger.SetLevel(levelMap[c.LogLevel])
// logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
srvConf := gosip.ServerConfig{}
if config.SipIP != "" {
srvConf.Host = config.SipIP
if c.SipIP != "" {
srvConf.Host = c.SipIP
}
srv = gosip.NewServer(srvConf, nil, nil, logger)
srv.OnRequest(sip.REGISTER, config.OnRegister)
srv.OnRequest(sip.MESSAGE, config.OnMessage)
srv.OnRequest(sip.NOTIFY, config.OnNotify)
srv.OnRequest(sip.BYE, config.onBye)
err := srv.Listen(strings.ToLower(config.SipNetwork), addr)
srv.OnRequest(sip.REGISTER, c.OnRegister)
srv.OnRequest(sip.MESSAGE, c.OnMessage)
srv.OnRequest(sip.NOTIFY, c.OnNotify)
srv.OnRequest(sip.BYE, c.OnBye)
err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
if err != nil {
plugin.Logger.Error("gb28181 server listen", zap.Error(err))
} else {
plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
}
go config.startMediaServer()
go c.startMediaServer()
if config.Username != "" || config.Password != "" {
go removeBanDevice(config)
if c.Username != "" || c.Password != "" {
go c.removeBanDevice()
}
}
func (config *GB28181Config) startMediaServer() {
if config.MediaNetwork == "tcp" {
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.tcpPorts.Valid {
config.listenMediaTCP()
func (c *GB28181Config) startMediaServer() {
if c.MediaNetwork == "tcp" {
c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.tcpPorts.Valid {
c.listenMediaTCP()
}
} else {
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.udpPorts.Valid {
config.listenMediaUDP()
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
if !c.udpPorts.Valid {
c.listenMediaUDP()
}
}
}
func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
func (c *GB28181Config) processTcpMediaConn(conn net.Conn) {
var rtpPacket rtp.Packet
reader := bufio.NewReader(conn)
lenBuf := make([]byte, 2)
@@ -156,14 +156,14 @@ func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
}
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("gb28181 decode rtp error:", zap.Error(err))
} else if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
} else if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
publisher.PushPS(&rtpPacket)
}
}
}
func (config *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(config.MediaPort))
func (c *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
@@ -171,24 +171,24 @@ func (config *GB28181Config) listenMediaTCP() {
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return
}
plugin.Info("Media tcp server start.", zap.Uint16("port", config.MediaPort))
plugin.Info("Media tcp server start.", zap.Uint16("port", c.MediaPort))
defer listen.Close()
defer plugin.Info("Media tcp server stop", zap.Uint16("port", config.MediaPort))
defer plugin.Info("Media tcp server stop", zap.Uint16("port", c.MediaPort))
for {
conn, err := listen.Accept()
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
}
go processTcpMediaConn(config, conn)
go c.processTcpMediaConn(conn)
}
}
func (config *GB28181Config) listenMediaUDP() {
func (c *GB28181Config) listenMediaUDP() {
var rtpPacket rtp.Packet
networkBuffer := 1048576
addr := ":" + strconv.Itoa(int(config.MediaPort))
addr := ":" + strconv.Itoa(int(c.MediaPort))
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
@@ -197,15 +197,15 @@ func (config *GB28181Config) listenMediaUDP() {
return
}
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", config.MediaPort))
defer plugin.Info("Media udp server stop", zap.Uint16("port", config.MediaPort))
plugin.Info("Media udp server start.", zap.Uint16("port", c.MediaPort))
defer plugin.Info("Media udp server stop", zap.Uint16("port", c.MediaPort))
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
if publisher := c.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
if publisher.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if publisher.lastReceive.IsZero() {
@@ -237,8 +237,8 @@ func (config *GB28181Config) listenMediaUDP() {
// }
// }
func removeBanDevice(config *GB28181Config) {
t := time.NewTicker(config.RemoveBanInterval)
func (c *GB28181Config) removeBanDevice() {
t := time.NewTicker(c.RemoveBanInterval)
for range t.C {
DeviceRegisterCount.Range(func(key, value interface{}) bool {
if value.(int) > MaxRegisterCount {

View File

@@ -47,12 +47,12 @@ func (b *IOBuffer) ReadN(length int) ([]byte, error) {
return nil, io.EOF
}
func (b *IOBuffer) Read(buf []byte) (n int, err error) {
var ret []byte
ret, err = b.ReadN(len(buf))
copy(buf, ret)
return len(ret), err
}
//func (b *IOBuffer) Read(buf []byte) (n int, err error) {
// var ret []byte
// ret, err = b.ReadN(len(buf))
// copy(buf, ret)
// return len(ret), err
//}
// empty reports whether the unread portion of the buffer is empty.
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }

View File

@@ -192,12 +192,9 @@ func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
return err
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
if err == nil {
psl &= 0x07
dec.Skip(int(psl))
}
if len(dec.videoBuffer) > 0 {
dec.PushVideo(dec.vPTS, dec.vDTS, dec.videoBuffer)