Compare commits

...

13 Commits

Author SHA1 Message Date
dexter
eb6004d6ef 🐛 FIX: 防止dts自动生成 2022-10-17 11:37:30 +08:00
dexter
cce5f67ab9 🐛 FIX: initRoutes索引越界 2022-10-13 16:23:32 +08:00
dexter
fdfb462d46 Merge pull request #70 from WXC9102/v4
更新通道panic
2022-09-26 18:52:23 +08:00
weixuechao
c05adce562 1.更新通道时会偶发panic, 2.新增/删除通道信息没有更新 2022-09-26 16:54:55 +08:00
dexter
aa3727f582 🐛 FIX: unlock时的判空 2022-09-19 00:21:48 +08:00
dexter
6e8709176e 🐛 FIX: sdp中t值序列化问题 2022-09-16 18:06:15 +08:00
dexter
3e6c43f6ff 📦 NEW: UDP多端口支持 2022-09-16 16:23:58 +08:00
dexter
4a7aa94bd2 🐛 FIX: PTZCmd报文传递DeviceID为通道ID 2022-09-16 13:29:11 +08:00
dexter
fd13c6d9ab 🐛 FIX: InviteOptions.String方法格式化类型不匹配 2022-09-14 17:44:09 +08:00
dexter
085d413d2b 📦 NEW: 增加设备信息写盘重启快速恢复连接 2022-09-12 22:56:04 +08:00
dexter
4eba0e23f9 👌 IMPROVE: 增加网页显示dump文件的详细数据展示 2022-09-11 15:10:21 +08:00
dexter
f0324c4283 📦 NEW: 添加dump功能 2022-09-10 00:14:53 +08:00
dexter
f6b5f15b83 🐛 FIX: 查询录像接口文档错误 2022-09-06 19:15:08 +08:00
9 changed files with 499 additions and 126 deletions

View File

@@ -156,7 +156,7 @@ ptzcmd|是|PTZ控制指令
### 查询录像
`/gb28181/api/query/records`
`/gb28181/api/records`
参数名 | 必传 | 含义
|----|---|---

View File

@@ -1,15 +1,17 @@
package gb28181
import (
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"sync/atomic"
"sync"
"time"
"github.com/ghettovoice/gosip/sip"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/utils"
)
@@ -23,7 +25,7 @@ type ChannelEx struct {
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
state int32
liveInviteLock sync.Mutex
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
@@ -49,6 +51,28 @@ type Channel struct {
*ChannelEx //自定义属性
}
func (c *Channel) Copy(v *Channel) {
if v == nil {
return
}
c.DeviceID = v.DeviceID
c.ParentID = v.ParentID
c.Name = v.Name
c.Manufacturer = v.Manufacturer
c.Model = v.Model
c.Owner = v.Owner
c.CivilCode = v.CivilCode
c.Address = v.Address
c.Parental = v.Parental
c.SafetyWay = v.SafetyWay
c.RegisterWay = v.RegisterWay
c.Secrecy = v.Secrecy
c.Status = v.Status
c.Status = v.Status
c.ChannelEx = v.ChannelEx
}
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := c.device
d.sn++
@@ -113,7 +137,7 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
<EndTime>%s</EndTime>
<Secrecy>0</Secrecy>
<Type>all</Type>
</Query>`, d.sn, d.ID, startTime, endTime)
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
request.SetBody(body, true)
resp, err := d.SipRequestForResponse(request)
if err != nil {
@@ -132,7 +156,7 @@ func (channel *Channel) Control(PTZCmd string) int {
<SN>%d</SN>
<DeviceID>%s</DeviceID>
<PTZCmd>%s</PTZCmd>
</Control>`, d.sn, d.ID, PTZCmd)
</Control>`, d.sn, channel.DeviceID, PTZCmd)
request.SetBody(body, true)
resp, err := d.SipRequestForResponse(request)
if err != nil {
@@ -141,6 +165,63 @@ func (channel *Channel) Control(PTZCmd string) int {
return int(resp.StatusCode())
}
type InviteOptions struct {
Start int
End int
dump string
ssrc string
SSRC uint32
MediaPort uint16
}
func (o InviteOptions) IsLive() bool {
return o.Start == 0 || o.End == 0
}
func (o InviteOptions) Record() bool {
return !o.IsLive()
}
func (o *InviteOptions) Validate(start, end string) error {
if start != "" {
sint, err1 := strconv.ParseInt(start, 10, 0)
if err1 != nil {
return err1
}
o.Start = int(sint)
}
if end != "" {
eint, err2 := strconv.ParseInt(end, 10, 0)
if err2 != nil {
return err2
}
o.End = int(eint)
}
if o.Start >= o.End {
return errors.New("start < end")
}
return nil
}
func (o InviteOptions) String() string {
return fmt.Sprintf("t=%d %d", o.Start, o.End)
}
func (o *InviteOptions) CreateSSRC() {
ssrc := make([]byte, 10)
if o.IsLive() {
ssrc[0] = '0'
} else {
ssrc[0] = '1'
}
copy(ssrc[1:6], conf.Serial[3:8])
randNum := 1000 + rand.Intn(8999)
copy(ssrc[6:], strconv.Itoa(randNum))
o.ssrc = string(ssrc)
_ssrc, _ := strconv.ParseInt(o.ssrc, 10, 0)
o.SSRC = uint32(_ssrc)
}
/*
f字段 f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
各项具体含义:
@@ -185,66 +266,81 @@ f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(start, end string) (code int) {
if start == "" {
if !atomic.CompareAndSwapInt32(&channel.state, 0, 1) {
return 304
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
if opt.IsLive() {
if !channel.liveInviteLock.TryLock() {
return 304, nil
}
defer func() {
if code != 200 {
atomic.StoreInt32(&channel.state, 0)
channel.liveInviteLock.Unlock()
}
}()
channel.Bye(true)
} else {
channel.Bye(false)
}
sint, err1 := strconv.ParseInt(start, 10, 0)
eint, err2 := strconv.ParseInt(end, 10, 0)
channel.Bye(opt.IsLive())
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
s := "Play"
ssrc := make([]byte, 10)
if start != "" {
if err1 != nil || err2 != nil {
return 400
}
opt.CreateSSRC()
if opt.Record() {
s = "Playback"
ssrc[0] = '1'
streamPath = fmt.Sprintf("%s/%s/%s-%s", d.ID, channel.DeviceID, start, end)
} else {
ssrc[0] = '0'
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
// size := 1
// fps := 15
// bitrate := 200
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
copy(ssrc[1:6], conf.Serial[3:8])
randNum := 1000 + rand.Intn(8999)
copy(ssrc[6:], strconv.Itoa(randNum))
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
}
protocol := ""
port := conf.MediaPort
if conf.IsMediaNetworkTCP() {
protocol = "TCP/"
port = conf.MediaPort + channel.tcpPortIndex
if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
channel.tcpPortIndex = 0
if conf.tcpPorts.Valid {
// opt.MediaPort, err = publisher.ListenTCP()
// if err != nil {
// return 500, err
// }
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = publisher.ListenUDP()
if err != nil {
code = 500
return
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
}
// if opt.MediaPort == 0 {
// opt.MediaPort = conf.MediaPort
// if conf.IsMediaNetworkTCP() {
// protocol = "TCP/"
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
// channel.tcpPortIndex = 0
// }
// }
// }
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
"s=" + s,
"u=" + channel.DeviceID + ":0",
"c=IN IP4 " + d.mediaIP,
fmt.Sprintf("t=%d %d", sint, eint),
fmt.Sprintf("m=video %d %sRTP/AVP 96", port, protocol),
opt.String(),
fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol),
"a=recvonly",
"a=rtpmap:96 PS/90000",
"y=" + string(ssrc),
"\r\n",
"y=" + opt.ssrc,
"",
}
// if config.IsMediaNetworkTCP() {
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
@@ -256,50 +352,46 @@ func (channel *Channel) Invite(start, end string) (code int) {
invite.SetBody(strings.Join(sdpInfo, "\r\n"), true)
subject := sip.GenericHeader{
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, ssrc, conf.Serial),
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
}
invite.AppendHeader(&subject)
response, err := d.SipRequestForResponse(invite)
if response == nil || err != nil {
return http.StatusRequestTimeout
publisher.inviteRes, err = d.SipRequestForResponse(invite)
if err != nil {
return http.StatusRequestTimeout, err
}
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, response.StatusCode()))
if response.StatusCode() == 200 {
ds := strings.Split(response.Body(), "\r\n")
_SSRC, _ := strconv.ParseInt(string(ssrc), 10, 0)
SSRC := uint32(_SSRC)
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
if code == 200 {
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
if ls[0] == "y" && len(ls[1]) > 0 {
_SSRC, _ = strconv.ParseInt(ls[1], 10, 0)
SSRC = uint32(_SSRC)
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
opt.SSRC = uint32(_ssrc)
} else {
plugin.Error("read invite response y ", zap.Error(err))
}
break
}
}
}
publisher := &GBPublisher{
SSRC: SSRC,
channel: channel,
Start: start,
End: end,
inviteRes: &response,
}
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
publisher.udpCache = utils.NewPqRtp()
}
if plugin.Publish(streamPath, publisher) != nil {
return 403
if err = plugin.Publish(streamPath, publisher); err != nil {
code = 403
return
}
ack := sip.NewAckRequest("", invite, response, "", nil)
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
srv.Send(ack)
} else if start == "" && conf.AutoInvite {
} else if opt.IsLive() && conf.AutoInvite {
time.AfterFunc(time.Second*5, func() {
channel.Invite("", "")
channel.Invite(InviteOptions{})
})
}
return int(response.StatusCode())
return
}
func (channel *Channel) Bye(live bool) int {
if live && channel.LivePublisher != nil {
return channel.LivePublisher.Bye()

View File

@@ -2,8 +2,10 @@ package gb28181
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"
@@ -58,7 +60,6 @@ type Device struct {
addr sip.Address
sipIP string //设备对应网卡的服务器ip
mediaIP string //设备对应网卡的服务器ip
tx *sip.ServerTransaction
NetAddr string
channelMap map[string]*Channel
channelMutex sync.RWMutex
@@ -68,7 +69,42 @@ type Device struct {
}
}
func (config *GB28181Config) StoreDevice(id string, req sip.Request, tx *sip.ServerTransaction) {
func (config *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
from, _ := req.From()
d.addr = sip.Address{
DisplayName: from.DisplayName,
Uri: from.Address,
}
deviceIp := req.Source()
servIp := req.Recipient().Host()
//根据网卡ip获取对应的公网ip
sipIP := config.routes[servIp]
//如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
sipIP = servIp
}
}
//如果用户配置过则使用配置的
if config.SipIP != "" {
sipIP = config.SipIP
} else if sipIP == "" {
sipIP = myip.InternalIPv4()
}
mediaIp := sipIP
if config.MediaIP != "" {
mediaIp = config.MediaIP
}
plugin.Info("RecoverDevice", zap.String("id", d.ID), zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
d.Status = string(sip.REGISTER)
d.sipIP = sipIP
d.mediaIP = mediaIp
d.NetAddr = deviceIp
d.UpdateTime = time.Now()
d.channelMap = make(map[string]*Channel)
go d.Catalog()
}
func (config *GB28181Config) StoreDevice(id string, req sip.Request) {
var d *Device
from, _ := req.From()
deviceAddr := sip.Address{
@@ -111,14 +147,41 @@ func (config *GB28181Config) StoreDevice(id string, req sip.Request, tx *sip.Ser
addr: deviceAddr,
sipIP: sipIP,
mediaIP: mediaIp,
tx: tx,
NetAddr: deviceIp,
channelMap: make(map[string]*Channel),
}
Devices.Store(id, d)
SaveDevices()
go d.Catalog()
}
}
func ReadDevices() {
if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
defer f.Close()
var items []*Device
if err = json.NewDecoder(f).Decode(&items); err == nil {
for _, item := range items {
if time.Since(item.UpdateTime) < time.Duration(conf.RegisterValidity)*time.Second {
item.Status = "RECOVER"
Devices.Store(item.ID, item)
}
}
}
}
}
func SaveDevices() {
var item []any
Devices.Range(func(key, value any) bool {
item = append(item, value)
return true
})
if f, err := os.OpenFile("devices.json", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
defer f.Close()
encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")
encoder.Encode(item)
}
}
func (d *Device) addChannel(channel *Channel) {
for _, c := range d.Channels {
@@ -170,20 +233,22 @@ func (d *Device) UpdateChannels(list []*Channel) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
old.Copy(c)
c = old
} else {
c.ChannelEx = &ChannelEx{
device: d,
}
d.channelMap[c.DeviceID] = c
}
if conf.AutoInvite && (c.LivePublisher == nil) {
go c.Invite("", "")
go c.Invite(InviteOptions{})
}
if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
c.LiveSubSP = s.Path
} else {
c.LiveSubSP = ""
}
d.channelMap[c.DeviceID] = c
}
}
func (d *Device) UpdateRecord(channelId string, list []*Record) {
@@ -393,7 +458,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := Channel{
channel := &Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -408,11 +473,12 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addChannel(&channel)
channels := []*Channel{channel}
d.UpdateChannels(channels)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
delete(d.channelMap, v.DeviceID)
d.channelOffline(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
// 更新通道

View File

@@ -92,7 +92,7 @@ func (config *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransactio
}
}
if passAuth {
config.StoreDevice(id, req, &tx)
config.StoreDevice(id, req)
DeviceNonce.Delete(id)
DeviceRegisterCount.Delete(id)
resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
@@ -127,7 +127,11 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
id := from.Address.User().String()
if v, ok := Devices.Load(id); ok {
d := v.(*Device)
if d.Status == string(sip.REGISTER) {
switch d.Status {
case "RECOVER":
config.RecoverDevice(d, req)
return
case string(sip.REGISTER):
d.Status = "ONLINE"
//go d.QueryDeviceInfo(req)
}
@@ -166,7 +170,7 @@ func (config *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction
for _, c := range d.Channels {
if config.AutoInvite &&
(c.LivePublisher == nil) {
c.Invite("", "")
c.Invite(InviteOptions{})
}
}
}

View File

@@ -44,6 +44,7 @@ type GB28181Config struct {
Server
LogLevel string //trace, debug, info, warn, error, fatal, panic
routes map[string]string
DumpPath string //dump PS流本地文件路径
}
func (c *GB28181Config) initRoutes() {
@@ -51,13 +52,16 @@ func (c *GB28181Config) initRoutes() {
tempIps := myip.LocalAndInternalIPs()
for k, v := range tempIps {
c.routes[k] = v
c.routes[k[0:strings.LastIndex(k, ".")]] = k
if lastdot := strings.LastIndex(k, "."); lastdot >= 0 {
c.routes[k[0:lastdot]] = k
}
}
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
}
func (c *GB28181Config) OnEvent(event any) {
switch event.(type) {
case FirstConfig:
ReadDevices()
go c.initRoutes()
c.startServer()
}

View File

@@ -2,30 +2,46 @@ package gb28181
import (
"encoding/binary"
"sync/atomic"
"fmt"
"io"
"net"
"os"
"time"
"github.com/ghettovoice/gosip/sip"
"github.com/pion/rtp/v2"
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
)
type GBPublisher struct {
Publisher
Start string
End string
SSRC uint32
channel *Channel
inviteRes *sip.Response
parser *utils.DecPSPackage
lastSeq uint16
udpCache *utils.PriorityQueueRtp
InviteOptions
channel *Channel
inviteRes sip.Response
parser *utils.DecPSPackage
lastSeq uint16
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
}
func (p *GBPublisher) PrintDump(s string) {
if p.dumpPrint != nil {
p.dumpPrint.Write([]byte(s))
}
}
func (p *GBPublisher) OnEvent(event any) {
switch v := event.(type) {
if p.channel == nil {
p.IO.OnEvent(event)
return
}
switch event.(type) {
case IPublisher:
if p.IsLive() {
p.Type = "GB28181 Live"
@@ -35,32 +51,36 @@ func (p *GBPublisher) OnEvent(event any) {
p.channel.RecordPublisher = p
}
conf.publishers.Add(p.SSRC, p)
if p.Equal(v) { //第一任
} else {
//删除前任
conf.publishers.Delete(v.(*GBPublisher).SSRC)
p.Publisher.OnEvent(v)
if err := error(nil); p.dump != "" {
if p.dumpFile, err = os.OpenFile(p.dump, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
p.Error("open dump file failed", zap.Error(err))
}
}
case SEwaitPublish:
//掉线自动重新拉流
if p.IsLive() {
atomic.StoreInt32(&p.channel.state, 0)
p.channel.LivePublisher = nil
go p.channel.Invite("", "")
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
p.channel.LivePublisher = nil
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
} else {
p.channel.RecordPublisher = nil
}
p.Publisher.OnEvent(v)
conf.publishers.Delete(p.SSRC)
if p.dumpFile != nil {
p.dumpFile.Close()
}
p.Bye()
default:
p.Publisher.OnEvent(v)
}
p.Publisher.OnEvent(event)
}
func (p *GBPublisher) Bye() int {
@@ -69,12 +89,11 @@ func (p *GBPublisher) Bye() int {
return 404
}
defer p.Stop()
defer atomic.StoreInt32(&p.channel.state, 0)
p.inviteRes = nil
bye := p.channel.CreateRequst(sip.BYE)
from, _ := (*res).From()
to, _ := (*res).To()
callId, _ := (*res).CallID()
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
@@ -86,10 +105,6 @@ func (p *GBPublisher) Bye() int {
return int(resp.StatusCode())
}
func (p *GBPublisher) IsLive() bool {
return p.Start == ""
}
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
if p.VideoTrack == nil {
switch p.parser.VideoStreamType {
@@ -101,6 +116,10 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
return
}
}
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
if dts == 0 {
dts = pts
}
p.VideoTrack.WriteAnnexB(pts, dts, payload)
}
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
@@ -161,7 +180,11 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
if p.parser.Len() > 0 {
p.parser.Skip(4)
p.PrintDump("</td></tr>")
p.PrintDump("<tr>")
p.parser.Read(rtp.Timestamp, p)
p.PrintDump("</tr>")
p.PrintDump("<tr class=gray><td colspan=12>")
p.parser.Reset()
}
p.parser.Write(ps)
@@ -169,3 +192,78 @@ func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
p.parser.Write(ps)
}
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()
if p.dumpPrint != nil {
p.PrintDump(`<style type="text/css">
.gray {
color: gray;
}
</style>
`)
p.PrintDump("<table>")
defer p.PrintDump("</table>")
}
p.PrintDump("<tr class=gray><td colspan=12>")
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
if err != nil {
return
}
payload := make([]byte, util.ReadBE[int](l[:4]))
t = util.ReadBE[uint16](l[4:])
p.PrintDump(fmt.Sprintf("[<b>%d</b> %d]", t, len(payload)))
_, err = f.Read(payload)
if err != nil {
return
}
rtpPacket.Unmarshal(payload)
p.PushPS(&rtpPacket)
}
return
}
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
var rtpPacket rtp.Packet
networkBuffer := 1048576
port, err = conf.udpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
p.PushPS(&rtpPacket)
}
}()
return
}

View File

@@ -2,7 +2,9 @@ package gb28181
import (
"net/http"
"os"
"strconv"
"strings"
"time"
"m7s.live/engine/v4/util"
@@ -31,37 +33,72 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.QueryRecord(startTime, endTime))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
// CORS(w, r)
id := r.URL.Query().Get("id")
channel := r.URL.Query().Get("channel")
ptzcmd := r.URL.Query().Get("ptzcmd")
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Control(ptzcmd))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
// CORS(w, r)
query := r.URL.Query()
id := query.Get("id")
channel := r.URL.Query().Get("channel")
startTime := query.Get("startTime")
endTime := query.Get("endTime")
if c := FindChannel(id, channel); c != nil {
if startTime == "" && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else {
w.WriteHeader(c.Invite(startTime, endTime))
}
channel := query.Get("channel")
port, _ := strconv.Atoi(query.Get("mediaPort"))
opt := InviteOptions{
dump: query.Get("dump"),
MediaPort: uint16(port),
}
opt.Validate(query.Get("startTime"), query.Get("endTime"))
if c := FindChannel(id, channel); c == nil {
http.NotFound(w, r)
} else if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else if code, err := c.Invite(opt); err == nil {
w.WriteHeader(code)
} else {
w.WriteHeader(404)
http.Error(w, err.Error(), code)
}
}
func (conf *GB28181Config) API_replay(w http.ResponseWriter, r *http.Request) {
dump := r.URL.Query().Get("dump")
printOut := r.URL.Query().Get("print")
if dump == "" {
dump = conf.DumpPath
}
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
streamPath := dump
if strings.HasPrefix(dump, "/") {
streamPath = "replay" + dump
} else {
streamPath = "replay/" + dump
}
var pub GBPublisher
pub.SetIO(f)
if err = plugin.Publish(streamPath, &pub); err == nil {
if printOut != "" {
pub.dumpPrint = w
pub.SetParentCtx(r.Context())
err = pub.Replay(f)
} else {
go pub.Replay(f)
w.Write([]byte("ok"))
}
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
@@ -73,7 +110,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Bye(live != "false"))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
@@ -94,6 +131,6 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
d := v.(*Device)
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}

View File

@@ -23,9 +23,55 @@ import (
var srv gosip.Server
type PortManager struct {
recycle chan uint16
max uint16
pos uint16
Valid bool
}
func (pm *PortManager) Init(start, end uint16) {
pm.pos = start
pm.max = end
if pm.pos > 0 && pm.max > pm.pos {
pm.Valid = true
pm.recycle = make(chan uint16, pm.Range())
}
}
func (pm *PortManager) Range() uint16 {
return pm.max - pm.pos
}
func (pm *PortManager) Recycle(p uint16) (err error) {
select {
case pm.recycle <- p:
return nil
default:
return io.EOF //TODO: 换一个Error
}
}
func (pm *PortManager) GetPort() (p uint16, err error) {
select {
case p = <-pm.recycle:
return
default:
if pm.Range() > 0 {
pm.pos++
p = pm.pos
return
} else {
return 0, io.EOF //TODO: 换一个Error
}
}
}
type Server struct {
Ignores map[string]struct{}
publishers util.Map[uint32, *GBPublisher]
tcpPorts PortManager
udpPorts PortManager
}
const MaxRegisterCount = 3
@@ -82,9 +128,15 @@ func (config *GB28181Config) startServer() {
func (config *GB28181Config) startMediaServer() {
if config.MediaNetwork == "tcp" {
listenMediaTCP(config)
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.tcpPorts.Valid {
config.listenMediaTCP()
}
} else {
listenMediaUDP(config)
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.udpPorts.Valid {
config.listenMediaUDP()
}
}
}
@@ -110,7 +162,7 @@ func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
}
}
func listenMediaTCP(config *GB28181Config) {
func (config *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(config.MediaPort))
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
@@ -132,7 +184,7 @@ func listenMediaTCP(config *GB28181Config) {
}
}
func listenMediaUDP(config *GB28181Config) {
func (config *GB28181Config) listenMediaUDP() {
var rtpPacket rtp.Packet
networkBuffer := 1048576
@@ -147,13 +199,24 @@ func listenMediaUDP(config *GB28181Config) {
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", config.MediaPort))
defer plugin.Info("Media udp server stop", zap.Uint16("port", config.MediaPort))
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
if publisher := config.publishers.Get(rtpPacket.SSRC); publisher != nil && publisher.Publisher.Err() == nil {
if publisher.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if publisher.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(publisher.lastReceive).Milliseconds()))
}
publisher.lastReceive = time.Now()
publisher.dumpFile.Write(dumpLen)
publisher.dumpFile.Write(ps)
}
publisher.PushPS(&rtpPacket)
}
}

View File

@@ -51,6 +51,7 @@ var (
type Pusher interface {
PushVideo(uint32, uint32, []byte)
PushAudio(uint32, []byte)
PrintDump(string)
}
/*
@@ -144,6 +145,7 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
dec.clean()
dec.PTS = ts
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
if err := dec.Skip(9); err != nil {
return err
}
@@ -158,6 +160,7 @@ func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
}
var video []byte
var nextStartCode uint32
pusher.PrintDump("<td>")
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
@@ -165,10 +168,12 @@ loop:
}
switch nextStartCode {
case StartCodeSYS:
pusher.PrintDump("[sys]")
dec.ReadPayload()
//err = dec.decSystemHeader()
case StartCodeMAP:
err = dec.decProgramStreamMap()
pusher.PrintDump("[map]")
case StartCodeVideo:
if err = dec.decPESPacket(); err == nil {
// if len(video) == 0 {
@@ -183,6 +188,7 @@ loop:
} else {
fmt.Println("video", err)
}
pusher.PrintDump("[video]")
case StartCodeAudio:
if err = dec.decPESPacket(); err == nil {
ts := ts / 90
@@ -190,16 +196,19 @@ loop:
ts = dec.PTS / 90
}
pusher.PushAudio(ts, dec.Payload)
pusher.PrintDump("[audio]")
} else {
fmt.Println("audio", err)
}
case StartCodePS:
break loop
default:
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
dec.ReadPayload()
}
}
if len(video) > 0 {
pusher.PrintDump("</td>")
pusher.PushVideo(dec.PTS, dec.DTS, video)
video = nil
}