mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb6004d6ef | ||
|
|
cce5f67ab9 | ||
|
|
fdfb462d46 | ||
|
|
c05adce562 | ||
|
|
aa3727f582 | ||
|
|
6e8709176e | ||
|
|
3e6c43f6ff | ||
|
|
4a7aa94bd2 | ||
|
|
fd13c6d9ab | ||
|
|
085d413d2b | ||
|
|
4eba0e23f9 | ||
|
|
f0324c4283 | ||
|
|
f6b5f15b83 |
@@ -156,7 +156,7 @@ ptzcmd|是|PTZ控制指令
|
||||
|
||||
### 查询录像
|
||||
|
||||
`/gb28181/api/query/records`
|
||||
`/gb28181/api/records`
|
||||
|
||||
参数名 | 必传 | 含义
|
||||
|----|---|---
|
||||
|
||||
214
channel.go
214
channel.go
@@ -1,15 +1,17 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
)
|
||||
|
||||
@@ -23,7 +25,7 @@ type ChannelEx struct {
|
||||
RecordEndTime string
|
||||
recordStartTime time.Time
|
||||
recordEndTime time.Time
|
||||
state int32
|
||||
liveInviteLock sync.Mutex
|
||||
tcpPortIndex uint16
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
@@ -49,6 +51,28 @@ type Channel struct {
|
||||
*ChannelEx //自定义属性
|
||||
}
|
||||
|
||||
func (c *Channel) Copy(v *Channel) {
|
||||
if v == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.DeviceID = v.DeviceID
|
||||
c.ParentID = v.ParentID
|
||||
c.Name = v.Name
|
||||
c.Manufacturer = v.Manufacturer
|
||||
c.Model = v.Model
|
||||
c.Owner = v.Owner
|
||||
c.CivilCode = v.CivilCode
|
||||
c.Address = v.Address
|
||||
c.Parental = v.Parental
|
||||
c.SafetyWay = v.SafetyWay
|
||||
c.RegisterWay = v.RegisterWay
|
||||
c.Secrecy = v.Secrecy
|
||||
c.Status = v.Status
|
||||
c.Status = v.Status
|
||||
c.ChannelEx = v.ChannelEx
|
||||
}
|
||||
|
||||
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
d := c.device
|
||||
d.sn++
|
||||
@@ -113,7 +137,7 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
|
||||
<EndTime>%s</EndTime>
|
||||
<Secrecy>0</Secrecy>
|
||||
<Type>all</Type>
|
||||
</Query>`, d.sn, d.ID, startTime, endTime)
|
||||
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
|
||||
request.SetBody(body, true)
|
||||
resp, err := d.SipRequestForResponse(request)
|
||||
if err != nil {
|
||||
@@ -132,7 +156,7 @@ func (channel *Channel) Control(PTZCmd string) int {
|
||||
<SN>%d</SN>
|
||||
<DeviceID>%s</DeviceID>
|
||||
<PTZCmd>%s</PTZCmd>
|
||||
</Control>`, d.sn, d.ID, PTZCmd)
|
||||
</Control>`, d.sn, channel.DeviceID, PTZCmd)
|
||||
request.SetBody(body, true)
|
||||
resp, err := d.SipRequestForResponse(request)
|
||||
if err != nil {
|
||||
@@ -141,6 +165,63 @@ func (channel *Channel) Control(PTZCmd string) int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
type InviteOptions struct {
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
MediaPort uint16
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
ssrc := make([]byte, 10)
|
||||
if o.IsLive() {
|
||||
ssrc[0] = '0'
|
||||
} else {
|
||||
ssrc[0] = '1'
|
||||
}
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
o.ssrc = string(ssrc)
|
||||
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
|
||||
o.SSRC = uint32(_ssrc)
|
||||
}
|
||||
|
||||
/*
|
||||
f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
|
||||
各项具体含义:
|
||||
@@ -185,66 +266,81 @@ f = v/a/编码格式/码率大小/采样率
|
||||
f字段中视、音频参数段之间不需空格分割。
|
||||
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
|
||||
*/
|
||||
func (channel *Channel) Invite(start, end string) (code int) {
|
||||
if start == "" {
|
||||
if !atomic.CompareAndSwapInt32(&channel.state, 0, 1) {
|
||||
return 304
|
||||
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if opt.IsLive() {
|
||||
if !channel.liveInviteLock.TryLock() {
|
||||
return 304, nil
|
||||
}
|
||||
defer func() {
|
||||
if code != 200 {
|
||||
atomic.StoreInt32(&channel.state, 0)
|
||||
channel.liveInviteLock.Unlock()
|
||||
}
|
||||
}()
|
||||
channel.Bye(true)
|
||||
} else {
|
||||
channel.Bye(false)
|
||||
}
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
channel.Bye(opt.IsLive())
|
||||
d := channel.device
|
||||
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
s := "Play"
|
||||
ssrc := make([]byte, 10)
|
||||
if start != "" {
|
||||
if err1 != nil || err2 != nil {
|
||||
return 400
|
||||
}
|
||||
opt.CreateSSRC()
|
||||
if opt.Record() {
|
||||
s = "Playback"
|
||||
ssrc[0] = '1'
|
||||
streamPath = fmt.Sprintf("%s/%s/%s-%s", d.ID, channel.DeviceID, start, end)
|
||||
} else {
|
||||
ssrc[0] = '0'
|
||||
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
|
||||
}
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
|
||||
// size := 1
|
||||
// fps := 15
|
||||
// bitrate := 200
|
||||
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
|
||||
|
||||
copy(ssrc[1:6], conf.Serial[3:8])
|
||||
randNum := 1000 + rand.Intn(8999)
|
||||
copy(ssrc[6:], strconv.Itoa(randNum))
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
}
|
||||
protocol := ""
|
||||
port := conf.MediaPort
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
protocol = "TCP/"
|
||||
port = conf.MediaPort + channel.tcpPortIndex
|
||||
if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
channel.tcpPortIndex = 0
|
||||
if conf.tcpPorts.Valid {
|
||||
// opt.MediaPort, err = publisher.ListenTCP()
|
||||
// if err != nil {
|
||||
// return 500, err
|
||||
// }
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenUDP()
|
||||
if err != nil {
|
||||
code = 500
|
||||
return
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
}
|
||||
// if opt.MediaPort == 0 {
|
||||
// opt.MediaPort = conf.MediaPort
|
||||
// if conf.IsMediaNetworkTCP() {
|
||||
// protocol = "TCP/"
|
||||
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
|
||||
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
// channel.tcpPortIndex = 0
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
|
||||
"s=" + s,
|
||||
"u=" + channel.DeviceID + ":0",
|
||||
"c=IN IP4 " + d.mediaIP,
|
||||
fmt.Sprintf("t=%d %d", sint, eint),
|
||||
fmt.Sprintf("m=video %d %sRTP/AVP 96", port, protocol),
|
||||
opt.String(),
|
||||
fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol),
|
||||
"a=recvonly",
|
||||
"a=rtpmap:96 PS/90000",
|
||||
"y=" + string(ssrc),
|
||||
"\r\n",
|
||||
"y=" + opt.ssrc,
|
||||
"",
|
||||
}
|
||||
// if config.IsMediaNetworkTCP() {
|
||||
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
@@ -256,50 +352,46 @@ func (channel *Channel) Invite(start, end string) (code int) {
|
||||
invite.SetBody(strings.Join(sdpInfo, "\r\n"), true)
|
||||
|
||||
subject := sip.GenericHeader{
|
||||
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, ssrc, conf.Serial),
|
||||
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
|
||||
}
|
||||
invite.AppendHeader(&subject)
|
||||
response, err := d.SipRequestForResponse(invite)
|
||||
if response == nil || err != nil {
|
||||
return http.StatusRequestTimeout
|
||||
publisher.inviteRes, err = d.SipRequestForResponse(invite)
|
||||
if err != nil {
|
||||
return http.StatusRequestTimeout, err
|
||||
}
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, response.StatusCode()))
|
||||
|
||||
if response.StatusCode() == 200 {
|
||||
ds := strings.Split(response.Body(), "\r\n")
|
||||
_SSRC, _ := strconv.ParseInt(string(ssrc), 10, 0)
|
||||
SSRC := uint32(_SSRC)
|
||||
code = int(publisher.inviteRes.StatusCode())
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
|
||||
if code == 200 {
|
||||
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
|
||||
for _, l := range ds {
|
||||
if ls := strings.Split(l, "="); len(ls) > 1 {
|
||||
if ls[0] == "y" && len(ls[1]) > 0 {
|
||||
_SSRC, _ = strconv.ParseInt(ls[1], 10, 0)
|
||||
SSRC = uint32(_SSRC)
|
||||
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
|
||||
opt.SSRC = uint32(_ssrc)
|
||||
} else {
|
||||
plugin.Error("read invite response y ", zap.Error(err))
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
publisher := &GBPublisher{
|
||||
SSRC: SSRC,
|
||||
channel: channel,
|
||||
Start: start,
|
||||
End: end,
|
||||
inviteRes: &response,
|
||||
}
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
publisher.udpCache = utils.NewPqRtp()
|
||||
}
|
||||
if plugin.Publish(streamPath, publisher) != nil {
|
||||
return 403
|
||||
if err = plugin.Publish(streamPath, publisher); err != nil {
|
||||
code = 403
|
||||
return
|
||||
}
|
||||
ack := sip.NewAckRequest("", invite, response, "", nil)
|
||||
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
|
||||
srv.Send(ack)
|
||||
} else if start == "" && conf.AutoInvite {
|
||||
} else if opt.IsLive() && conf.AutoInvite {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
channel.Invite("", "")
|
||||
channel.Invite(InviteOptions{})
|
||||
})
|
||||
}
|
||||
return int(response.StatusCode())
|
||||
return
|
||||
}
|
||||
|
||||
func (channel *Channel) Bye(live bool) int {
|
||||
if live && channel.LivePublisher != nil {
|
||||
return channel.LivePublisher.Bye()
|
||||
|
||||
82
device.go
82
device.go
@@ -2,8 +2,10 @@ package gb28181
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -58,7 +60,6 @@ type Device struct {
|
||||
addr sip.Address
|
||||
sipIP string //设备对应网卡的服务器ip
|
||||
mediaIP string //设备对应网卡的服务器ip
|
||||
tx *sip.ServerTransaction
|
||||
NetAddr string
|
||||
channelMap map[string]*Channel
|
||||
channelMutex sync.RWMutex
|
||||
@@ -68,7 +69,42 @@ type Device struct {
|
||||
}
|
||||
}
|
||||
|
||||
func (config *GB28181Config) StoreDevice(id string, req sip.Request, tx *sip.ServerTransaction) {
|
||||
func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
|
||||
from, _ := req.From()
|
||||
d.addr = sip.Address{
|
||||
DisplayName: from.DisplayName,
|
||||
Uri: from.Address,
|
||||
}
|
||||
deviceIp := req.Source()
|
||||
servIp := req.Recipient().Host()
|
||||
//根据网卡ip获取对应的公网ip
|
||||
sipIP := config.routes[servIp]
|
||||
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
|
||||
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
|
||||
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
|
||||
sipIP = servIp
|
||||
}
|
||||
}
|
||||
//如果用户配置过则使用配置的
|
||||
if config.SipIP != "" {
|
||||
sipIP = config.SipIP
|
||||
} else if sipIP == "" {
|
||||
sipIP = myip.InternalIPv4()
|
||||
}
|
||||
mediaIp := sipIP
|
||||
if config.MediaIP != "" {
|
||||
mediaIp = config.MediaIP
|
||||
}
|
||||
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
|
||||
d.Status = string(sip.REGISTER)
|
||||
d.sipIP = sipIP
|
||||
d.mediaIP = mediaIp
|
||||
d.NetAddr = deviceIp
|
||||
d.UpdateTime = time.Now()
|
||||
d.channelMap = make(map[string]*Channel)
|
||||
go d.Catalog()
|
||||
}
|
||||
func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
|
||||
var d *Device
|
||||
from, _ := req.From()
|
||||
deviceAddr := sip.Address{
|
||||
@@ -111,14 +147,41 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request, tx *sip.Ser
|
||||
addr: deviceAddr,
|
||||
sipIP: sipIP,
|
||||
mediaIP: mediaIp,
|
||||
tx: tx,
|
||||
NetAddr: deviceIp,
|
||||
channelMap: make(map[string]*Channel),
|
||||
}
|
||||
Devices.Store(id, d)
|
||||
SaveDevices()
|
||||
go d.Catalog()
|
||||
}
|
||||
}
|
||||
func ReadDevices() {
|
||||
if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
|
||||
defer f.Close()
|
||||
var items []*Device
|
||||
if err = json.NewDecoder(f).Decode(&items); err == nil {
|
||||
for _, item := range items {
|
||||
if time.Since(item.UpdateTime) < time.Duration(conf.RegisterValidity)*time.Second {
|
||||
item.Status = "RECOVER"
|
||||
Devices.Store(item.ID, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func SaveDevices() {
|
||||
var item []any
|
||||
Devices.Range(func(key, value any) bool {
|
||||
item = append(item, value)
|
||||
return true
|
||||
})
|
||||
if f, err := os.OpenFile("devices.json", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
|
||||
defer f.Close()
|
||||
encoder := json.NewEncoder(f)
|
||||
encoder.SetIndent("", " ")
|
||||
encoder.Encode(item)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Device) addChannel(channel *Channel) {
|
||||
for _, c := range d.Channels {
|
||||
@@ -170,20 +233,22 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
}
|
||||
old.Copy(c)
|
||||
c = old
|
||||
} else {
|
||||
c.ChannelEx = &ChannelEx{
|
||||
device: d,
|
||||
}
|
||||
d.channelMap[c.DeviceID] = c
|
||||
}
|
||||
if conf.AutoInvite && (c.LivePublisher == nil) {
|
||||
go c.Invite("", "")
|
||||
go c.Invite(InviteOptions{})
|
||||
}
|
||||
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
|
||||
c.LiveSubSP = s.Path
|
||||
} else {
|
||||
c.LiveSubSP = ""
|
||||
}
|
||||
d.channelMap[c.DeviceID] = c
|
||||
}
|
||||
}
|
||||
func (d *Device) UpdateRecord(channelId string, list []*Record) {
|
||||
@@ -393,7 +458,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
d.channelOffline(v.DeviceID)
|
||||
case "ADD":
|
||||
plugin.Debug("收到通道新增通知")
|
||||
channel := Channel{
|
||||
channel := &Channel{
|
||||
DeviceID: v.DeviceID,
|
||||
ParentID: v.ParentID,
|
||||
Name: v.Name,
|
||||
@@ -408,11 +473,12 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
Secrecy: v.Secrecy,
|
||||
Status: v.Status,
|
||||
}
|
||||
d.addChannel(&channel)
|
||||
channels := []*Channel{channel}
|
||||
d.UpdateChannels(channels)
|
||||
case "DEL":
|
||||
//删除
|
||||
plugin.Debug("收到通道删除通知")
|
||||
delete(d.channelMap, v.DeviceID)
|
||||
d.channelOffline(v.DeviceID)
|
||||
case "UPDATE":
|
||||
plugin.Debug("收到通道更新通知")
|
||||
// 更新通道
|
||||
|
||||
10
handle.go
10
handle.go
@@ -92,7 +92,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
|
||||
}
|
||||
}
|
||||
if passAuth {
|
||||
config.StoreDevice(id, req, &tx)
|
||||
config.StoreDevice(id, req)
|
||||
DeviceNonce.Delete(id)
|
||||
DeviceRegisterCount.Delete(id)
|
||||
resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
|
||||
@@ -127,7 +127,11 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
|
||||
id := from.Address.User().String()
|
||||
if v, ok := Devices.Load(id); ok {
|
||||
d := v.(*Device)
|
||||
if d.Status == string(sip.REGISTER) {
|
||||
switch d.Status {
|
||||
case "RECOVER":
|
||||
config.RecoverDevice(d, req)
|
||||
return
|
||||
case string(sip.REGISTER):
|
||||
d.Status = "ONLINE"
|
||||
//go d.QueryDeviceInfo(req)
|
||||
}
|
||||
@@ -166,7 +170,7 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
|
||||
for _, c := range d.Channels {
|
||||
if config.AutoInvite &&
|
||||
(c.LivePublisher == nil) {
|
||||
c.Invite("", "")
|
||||
c.Invite(InviteOptions{})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
6
main.go
6
main.go
@@ -44,6 +44,7 @@ type GB28181Config struct {
|
||||
Server
|
||||
LogLevel string //trace, debug, info, warn, error, fatal, panic
|
||||
routes map[string]string
|
||||
DumpPath string //dump PS流本地文件路径
|
||||
}
|
||||
|
||||
func (c *GB28181Config) initRoutes() {
|
||||
@@ -51,13 +52,16 @@ func (c *GB28181Config) initRoutes() {
|
||||
tempIps := myip.LocalAndInternalIPs()
|
||||
for k, v := range tempIps {
|
||||
c.routes[k] = v
|
||||
c.routes[k[0:strings.LastIndex(k, ".")]] = k
|
||||
if lastdot := strings.LastIndex(k, "."); lastdot >= 0 {
|
||||
c.routes[k[0:lastdot]] = k
|
||||
}
|
||||
}
|
||||
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
|
||||
}
|
||||
func (c *GB28181Config) OnEvent(event any) {
|
||||
switch event.(type) {
|
||||
case FirstConfig:
|
||||
ReadDevices()
|
||||
go c.initRoutes()
|
||||
c.startServer()
|
||||
}
|
||||
|
||||
160
publisher.go
160
publisher.go
@@ -2,30 +2,46 @@ package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync/atomic"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"github.com/pion/rtp/v2"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
)
|
||||
|
||||
type GBPublisher struct {
|
||||
Publisher
|
||||
Start string
|
||||
End string
|
||||
SSRC uint32
|
||||
channel *Channel
|
||||
inviteRes *sip.Response
|
||||
parser *utils.DecPSPackage
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
InviteOptions
|
||||
channel *Channel
|
||||
inviteRes sip.Response
|
||||
parser *utils.DecPSPackage
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
if p.dumpPrint != nil {
|
||||
p.dumpPrint.Write([]byte(s))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) OnEvent(event any) {
|
||||
switch v := event.(type) {
|
||||
if p.channel == nil {
|
||||
p.IO.OnEvent(event)
|
||||
return
|
||||
}
|
||||
switch event.(type) {
|
||||
case IPublisher:
|
||||
if p.IsLive() {
|
||||
p.Type = "GB28181 Live"
|
||||
@@ -35,32 +51,36 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.channel.RecordPublisher = p
|
||||
}
|
||||
conf.publishers.Add(p.SSRC, p)
|
||||
if p.Equal(v) { //第一任
|
||||
|
||||
} else {
|
||||
//删除前任
|
||||
conf.publishers.Delete(v.(*GBPublisher).SSRC)
|
||||
p.Publisher.OnEvent(v)
|
||||
if err := error(nil); p.dump != "" {
|
||||
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
|
||||
p.Error("open dump file failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
case SEwaitPublish:
|
||||
//掉线自动重新拉流
|
||||
if p.IsLive() {
|
||||
atomic.StoreInt32(&p.channel.state, 0)
|
||||
p.channel.LivePublisher = nil
|
||||
go p.channel.Invite("", "")
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
go p.channel.Invite(InviteOptions{})
|
||||
}
|
||||
case SEclose, SEKick:
|
||||
if p.IsLive() {
|
||||
p.channel.LivePublisher = nil
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
} else {
|
||||
p.channel.RecordPublisher = nil
|
||||
}
|
||||
p.Publisher.OnEvent(v)
|
||||
conf.publishers.Delete(p.SSRC)
|
||||
if p.dumpFile != nil {
|
||||
p.dumpFile.Close()
|
||||
}
|
||||
p.Bye()
|
||||
default:
|
||||
p.Publisher.OnEvent(v)
|
||||
}
|
||||
p.Publisher.OnEvent(event)
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Bye() int {
|
||||
@@ -69,12 +89,11 @@ func (p *GBPublisher) Bye() int {
|
||||
return 404
|
||||
}
|
||||
defer p.Stop()
|
||||
defer atomic.StoreInt32(&p.channel.state, 0)
|
||||
p.inviteRes = nil
|
||||
bye := p.channel.CreateRequst(sip.BYE)
|
||||
from, _ := (*res).From()
|
||||
to, _ := (*res).To()
|
||||
callId, _ := (*res).CallID()
|
||||
from, _ := res.From()
|
||||
to, _ := res.To()
|
||||
callId, _ := res.CallID()
|
||||
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
@@ -86,10 +105,6 @@ func (p *GBPublisher) Bye() int {
|
||||
return int(resp.StatusCode())
|
||||
}
|
||||
|
||||
func (p *GBPublisher) IsLive() bool {
|
||||
return p.Start == ""
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
if p.VideoTrack == nil {
|
||||
switch p.parser.VideoStreamType {
|
||||
@@ -101,6 +116,10 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
return
|
||||
}
|
||||
}
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
p.VideoTrack.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
@@ -161,7 +180,11 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
|
||||
if p.parser.Len() > 0 {
|
||||
p.parser.Skip(4)
|
||||
p.PrintDump("</td></tr>")
|
||||
p.PrintDump("<tr>")
|
||||
p.parser.Read(rtp.Timestamp, p)
|
||||
p.PrintDump("</tr>")
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
p.parser.Reset()
|
||||
}
|
||||
p.parser.Write(ps)
|
||||
@@ -169,3 +192,78 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
p.parser.Write(ps)
|
||||
}
|
||||
}
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
if p.dumpPrint != nil {
|
||||
p.PrintDump(`<style type="text/css">
|
||||
.gray {
|
||||
color: gray;
|
||||
}
|
||||
</style>
|
||||
`)
|
||||
p.PrintDump("<table>")
|
||||
defer p.PrintDump("</table>")
|
||||
}
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
var t uint16
|
||||
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = f.Read(l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
payload := make([]byte, util.ReadBE[int](l[:4]))
|
||||
t = util.ReadBE[uint16](l[4:])
|
||||
p.PrintDump(fmt.Sprintf("[<b>%d</b> %d]", t, len(payload)))
|
||||
_, err = f.Read(payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rtpPacket.Unmarshal(payload)
|
||||
p.PushPS(&rtpPacket)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
port, err = conf.udpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
if err != nil {
|
||||
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
p.SetIO(conn)
|
||||
go func() {
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", port))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
|
||||
defer conf.udpPorts.Recycle(port)
|
||||
dumpLen := make([]byte, 6)
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
if p.dumpFile != nil {
|
||||
util.PutBE(dumpLen[:4], n)
|
||||
if p.lastReceive.IsZero() {
|
||||
util.PutBE(dumpLen[4:], 0)
|
||||
} else {
|
||||
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
|
||||
}
|
||||
p.lastReceive = time.Now()
|
||||
p.dumpFile.Write(dumpLen)
|
||||
p.dumpFile.Write(ps)
|
||||
}
|
||||
p.PushPS(&rtpPacket)
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
69
restful.go
69
restful.go
@@ -2,7 +2,9 @@ package gb28181
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"m7s.live/engine/v4/util"
|
||||
@@ -31,37 +33,72 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.QueryRecord(startTime, endTime))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS(w, r)
|
||||
id := r.URL.Query().Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
ptzcmd := r.URL.Query().Get("ptzcmd")
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Control(ptzcmd))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS(w, r)
|
||||
query := r.URL.Query()
|
||||
id := query.Get("id")
|
||||
channel := r.URL.Query().Get("channel")
|
||||
startTime := query.Get("startTime")
|
||||
endTime := query.Get("endTime")
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
if startTime == "" && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else {
|
||||
w.WriteHeader(c.Invite(startTime, endTime))
|
||||
}
|
||||
channel := query.Get("channel")
|
||||
port, _ := strconv.Atoi(query.Get("mediaPort"))
|
||||
opt := InviteOptions{
|
||||
dump: query.Get("dump"),
|
||||
MediaPort: uint16(port),
|
||||
}
|
||||
opt.Validate(query.Get("startTime"), query.Get("endTime"))
|
||||
if c := FindChannel(id, channel); c == nil {
|
||||
http.NotFound(w, r)
|
||||
} else if opt.IsLive() && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else if code, err := c.Invite(opt); err == nil {
|
||||
w.WriteHeader(code)
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.Error(w, err.Error(), code)
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
|
||||
dump := r.URL.Query().Get("dump")
|
||||
printOut := r.URL.Query().Get("print")
|
||||
if dump == "" {
|
||||
dump = conf.DumpPath
|
||||
}
|
||||
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
streamPath := dump
|
||||
if strings.HasPrefix(dump, "/") {
|
||||
streamPath = "replay" + dump
|
||||
} else {
|
||||
streamPath = "replay/" + dump
|
||||
}
|
||||
var pub GBPublisher
|
||||
pub.SetIO(f)
|
||||
if err = plugin.Publish(streamPath, &pub); err == nil {
|
||||
if printOut != "" {
|
||||
pub.dumpPrint = w
|
||||
pub.SetParentCtx(r.Context())
|
||||
err = pub.Replay(f)
|
||||
} else {
|
||||
go pub.Replay(f)
|
||||
w.Write([]byte("ok"))
|
||||
}
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +110,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Bye(live != "false"))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +131,6 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
|
||||
d := v.(*Device)
|
||||
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
73
server.go
73
server.go
@@ -23,9 +23,55 @@ import (
|
||||
|
||||
var srv gosip.Server
|
||||
|
||||
type PortManager struct {
|
||||
recycle chan uint16
|
||||
max uint16
|
||||
pos uint16
|
||||
Valid bool
|
||||
}
|
||||
|
||||
func (pm *PortManager) Init(start, end uint16) {
|
||||
pm.pos = start
|
||||
pm.max = end
|
||||
if pm.pos > 0 && pm.max > pm.pos {
|
||||
pm.Valid = true
|
||||
pm.recycle = make(chan uint16, pm.Range())
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PortManager) Range() uint16 {
|
||||
return pm.max - pm.pos
|
||||
}
|
||||
|
||||
func (pm *PortManager) Recycle(p uint16) (err error) {
|
||||
select {
|
||||
case pm.recycle <- p:
|
||||
return nil
|
||||
default:
|
||||
return io.EOF //TODO: 换一个Error
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PortManager) GetPort() (p uint16, err error) {
|
||||
select {
|
||||
case p = <-pm.recycle:
|
||||
return
|
||||
default:
|
||||
if pm.Range() > 0 {
|
||||
pm.pos++
|
||||
p = pm.pos
|
||||
return
|
||||
} else {
|
||||
return 0, io.EOF //TODO: 换一个Error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
Ignores map[string]struct{}
|
||||
publishers util.Map[uint32, *GBPublisher]
|
||||
tcpPorts PortManager
|
||||
udpPorts PortManager
|
||||
}
|
||||
|
||||
const MaxRegisterCount = 3
|
||||
@@ -82,9 +128,15 @@ func (config *GB28181Config) startServer() {
|
||||
|
||||
func (config *GB28181Config) startMediaServer() {
|
||||
if config.MediaNetwork == "tcp" {
|
||||
listenMediaTCP(config)
|
||||
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.tcpPorts.Valid {
|
||||
config.listenMediaTCP()
|
||||
}
|
||||
} else {
|
||||
listenMediaUDP(config)
|
||||
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.udpPorts.Valid {
|
||||
config.listenMediaUDP()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +162,7 @@ func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
func listenMediaTCP(config *GB28181Config) {
|
||||
func (config *GB28181Config) listenMediaTCP() {
|
||||
addr := ":" + strconv.Itoa(int(config.MediaPort))
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
@@ -132,7 +184,7 @@ func listenMediaTCP(config *GB28181Config) {
|
||||
}
|
||||
}
|
||||
|
||||
func listenMediaUDP(config *GB28181Config) {
|
||||
func (config *GB28181Config) listenMediaUDP() {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
|
||||
@@ -147,13 +199,24 @@ func listenMediaUDP(config *GB28181Config) {
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", config.MediaPort))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", config.MediaPort))
|
||||
|
||||
dumpLen := make([]byte, 6)
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
|
||||
if publisher.dumpFile != nil {
|
||||
util.PutBE(dumpLen[:4], n)
|
||||
if publisher.lastReceive.IsZero() {
|
||||
util.PutBE(dumpLen[4:], 0)
|
||||
} else {
|
||||
util.PutBE(dumpLen[4:], uint16(time.Since(publisher.lastReceive).Milliseconds()))
|
||||
}
|
||||
publisher.lastReceive = time.Now()
|
||||
publisher.dumpFile.Write(dumpLen)
|
||||
publisher.dumpFile.Write(ps)
|
||||
}
|
||||
publisher.PushPS(&rtpPacket)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,7 @@ var (
|
||||
type Pusher interface {
|
||||
PushVideo(uint32, uint32, []byte)
|
||||
PushAudio(uint32, []byte)
|
||||
PrintDump(string)
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -144,6 +145,7 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
|
||||
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
|
||||
dec.clean()
|
||||
dec.PTS = ts
|
||||
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -158,6 +160,7 @@ func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
|
||||
}
|
||||
var video []byte
|
||||
var nextStartCode uint32
|
||||
pusher.PrintDump("<td>")
|
||||
loop:
|
||||
for err == nil {
|
||||
if nextStartCode, err = dec.Uint32(); err != nil {
|
||||
@@ -165,10 +168,12 @@ loop:
|
||||
}
|
||||
switch nextStartCode {
|
||||
case StartCodeSYS:
|
||||
pusher.PrintDump("[sys]")
|
||||
dec.ReadPayload()
|
||||
//err = dec.decSystemHeader()
|
||||
case StartCodeMAP:
|
||||
err = dec.decProgramStreamMap()
|
||||
pusher.PrintDump("[map]")
|
||||
case StartCodeVideo:
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
// if len(video) == 0 {
|
||||
@@ -183,6 +188,7 @@ loop:
|
||||
} else {
|
||||
fmt.Println("video", err)
|
||||
}
|
||||
pusher.PrintDump("[video]")
|
||||
case StartCodeAudio:
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
ts := ts / 90
|
||||
@@ -190,16 +196,19 @@ loop:
|
||||
ts = dec.PTS / 90
|
||||
}
|
||||
pusher.PushAudio(ts, dec.Payload)
|
||||
pusher.PrintDump("[audio]")
|
||||
} else {
|
||||
fmt.Println("audio", err)
|
||||
}
|
||||
case StartCodePS:
|
||||
break loop
|
||||
default:
|
||||
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
|
||||
dec.ReadPayload()
|
||||
}
|
||||
}
|
||||
if len(video) > 0 {
|
||||
pusher.PrintDump("</td>")
|
||||
pusher.PushVideo(dec.PTS, dec.DTS, video)
|
||||
video = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user