Compare commits

...

25 Commits

Author SHA1 Message Date
dexter
940d7c5e59 🐛 FIX: 优化PS解析 2022-11-17 23:47:20 +08:00
dexter
2142a474a3 🐛 FIX: 大PES分包机制 2022-11-17 11:53:36 +08:00
dexter
86e9bccb85 🐛 FIX: 增加对AAC的支持以及多slice合并发送 2022-11-13 23:30:45 +08:00
dexter
bb3a679a60 音频时间戳转换 2022-10-25 21:38:06 +08:00
dexter
ecd97c8439 🐛 FIX: tcp解析 2022-10-25 15:35:00 +08:00
dexter
bfd71a72d8 Merge pull request #74 from hongri8488/v4
添加28181 bye接口调用时将流关闭
2022-10-25 14:44:39 +08:00
dexter
c6bef8ccd8 🐛 FIX: udp范围端口增加超时功能 2022-10-25 14:44:03 +08:00
hongri8488
20c0ac52cb 添加28181 bye接口调用时将流关闭 2022-10-25 13:51:31 +08:00
dexter
34f5b7da79 🐛 FIX: 推测h264的时候取的下标错误 2022-10-24 10:54:03 +08:00
dexter
0d3a795dc2 👌 IMPROVE: 细粒度拼凑视频帧 2022-10-23 11:39:04 +08:00
dexter
b52d457990 🐛 FIX: 解决udp丢包引起的ps包组包错乱问题 2022-10-22 09:22:46 +08:00
dexter
4a214cebeb 🐛 FIX: PS解析 2022-10-21 16:16:44 +08:00
dexter
8f78f992ca Merge pull request #73 from hongri8488/v4
修改tcp请求部分设备不能拉流的问题
2022-10-21 11:52:36 +08:00
zhangdongfang
228d7b0cd2 修改tcp请求部分设备不能拉流的问题 2022-10-21 10:49:31 +08:00
dexter
e99150b0be Merge pull request #72 from hongri8488/v4
添加tcp多端口端口回收
2022-10-20 15:07:21 +08:00
zhangdongfang
31112e0052 添加tcp多端口端口回收 2022-10-20 15:03:18 +08:00
dexter
8663b8e171 Merge pull request #71 from hongri8488/v4
添加gb28181 tcp多端口
2022-10-20 14:55:32 +08:00
zhangdongfang
5960f07fc3 添加gb28181 tcp多端口 2022-10-20 14:49:31 +08:00
dexter
eb6004d6ef 🐛 FIX: 防止dts自动生成 2022-10-17 11:37:30 +08:00
dexter
cce5f67ab9 🐛 FIX: initRoutes索引越界 2022-10-13 16:23:32 +08:00
dexter
fdfb462d46 Merge pull request #70 from WXC9102/v4
更新通道panic
2022-09-26 18:52:23 +08:00
weixuechao
c05adce562 1.更新通道时会偶发panic, 2.新增/删除通道信息没有更新 2022-09-26 16:54:55 +08:00
dexter
aa3727f582 🐛 FIX: unlock时的判空 2022-09-19 00:21:48 +08:00
dexter
6e8709176e 🐛 FIX: sdp中t值序列化问题 2022-09-16 18:06:15 +08:00
dexter
3e6c43f6ff 📦 NEW: UDP多端口支持 2022-09-16 16:23:58 +08:00
9 changed files with 492 additions and 279 deletions

View File

@@ -22,7 +22,7 @@ gb28181:
prefetchrecord: false
udpcachesize: 0
sipnetwork: udp
sipip:
sipip: ""
sipport: 5060
serial: "34020000002000000001"
realm: "3402000000"
@@ -35,14 +35,15 @@ gb28181:
heartbeatinterval: 60
heartbeatretry: 3
mediaip:
mediaip: ""
mediaport: 58200
mediaidletimeout: 30
medianetwork: udp
mediaportmin: 0
meidaportmax: 0
removebaninterval: 600
loglevel: info
audioenable: true
```
- `AutoInvite` bool 表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
@@ -68,12 +69,15 @@ gb28181:
- `MediaPort` uint16 媒体服务器端口
- `MediaNetwork` string 媒体传输协议默认UDP可选TCP
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
- `MediaPortMin` uint16 媒体服务器端口范围最小值
- `MediaPortMax` uint16 媒体服务器端口范围最大值
- `AudioEnable` bool 是否开启音频
- `LogLevel` string 日志级别,默认 infotracedebuginfowarnerrorfatal, panic
- `RemoveBanInterval` int 定时移除注册失败的设备黑名单单位秒默认10分钟600秒
- `UdpCacheSize` int 表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启会最多缓存最多N个包并排序修复乱序造成的无法播放问题注意开启后会有一定的性能损耗并丢失部分包。
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
**注意某些摄像机没有设置用户名的地方摄像机会以自身的国标id作为用户名这个时候m7s会忽略使用摄像机的用户名忽略配置的用户名**
如果设备配置了错误的用户名和密码连续三次上报错误后m7s会记录设备id并在10分钟内禁止设备注册

View File

@@ -1,14 +1,16 @@
package gb28181
import (
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"sync/atomic"
"sync"
"time"
. "m7s.live/engine/v4"
"github.com/ghettovoice/gosip/sip"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/utils"
@@ -24,7 +26,7 @@ type ChannelEx struct {
RecordEndTime string
recordStartTime time.Time
recordEndTime time.Time
state int32
liveInviteLock sync.Mutex
tcpPortIndex uint16
GpsTime time.Time //gps时间
Longitude string //经度
@@ -50,6 +52,28 @@ type Channel struct {
*ChannelEx //自定义属性
}
func (c *Channel) Copy(v *Channel) {
if v == nil {
return
}
c.DeviceID = v.DeviceID
c.ParentID = v.ParentID
c.Name = v.Name
c.Manufacturer = v.Manufacturer
c.Model = v.Model
c.Owner = v.Owner
c.CivilCode = v.CivilCode
c.Address = v.Address
c.Parental = v.Parental
c.SafetyWay = v.SafetyWay
c.RegisterWay = v.RegisterWay
c.Secrecy = v.Secrecy
c.Status = v.Status
c.Status = v.Status
c.ChannelEx = v.ChannelEx
}
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := c.device
d.sn++
@@ -114,7 +138,7 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
<EndTime>%s</EndTime>
<Secrecy>0</Secrecy>
<Type>all</Type>
</Query>`, d.sn, d.ID, startTime, endTime)
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
request.SetBody(body, true)
resp, err := d.SipRequestForResponse(request)
if err != nil {
@@ -143,8 +167,8 @@ func (channel *Channel) Control(PTZCmd string) int {
}
type InviteOptions struct {
Start string
End string
Start int
End int
dump string
ssrc string
SSRC uint32
@@ -152,27 +176,36 @@ type InviteOptions struct {
}
func (o InviteOptions) IsLive() bool {
return o.Start == ""
return o.Start == 0 || o.End == 0
}
func (o InviteOptions) Record() bool {
return o.Start != ""
return !o.IsLive()
}
func (o InviteOptions) Validate() bool {
sint, err1 := strconv.ParseInt(o.Start, 10, 0)
eint, err2 := strconv.ParseInt(o.End, 10, 0)
if err1 != nil || err2 != nil {
return false
func (o *InviteOptions) Validate(start, end string) error {
if start != "" {
sint, err1 := strconv.ParseInt(start, 10, 0)
if err1 != nil {
return err1
}
o.Start = int(sint)
}
if sint >= eint {
return false
if end != "" {
eint, err2 := strconv.ParseInt(end, 10, 0)
if err2 != nil {
return err2
}
o.End = int(eint)
}
return true
if o.Start >= o.End {
return errors.New("start < end")
}
return nil
}
func (o InviteOptions) String() string {
return fmt.Sprintf("t=%s %s", o.Start, o.End)
return fmt.Sprintf("t=%d %d", o.Start, o.End)
}
func (o *InviteOptions) CreateSSRC() {
@@ -234,14 +267,14 @@ f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(opt InviteOptions) (code int) {
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
if opt.IsLive() {
if !atomic.CompareAndSwapInt32(&channel.state, 0, 1) {
return 304
if !channel.liveInviteLock.TryLock() {
return 304, nil
}
defer func() {
if code != 200 {
atomic.StoreInt32(&channel.state, 0)
channel.liveInviteLock.Unlock()
}
}()
}
@@ -251,29 +284,52 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
s := "Play"
opt.CreateSSRC()
if opt.Record() {
if !opt.Validate() {
return 400
}
s = "Playback"
streamPath = fmt.Sprintf("%s/%s/%s-%s", d.ID, channel.DeviceID, opt.Start, opt.End)
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
// size := 1
// fps := 15
// bitrate := 200
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
}
protocol := ""
if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
if conf.IsMediaNetworkTCP() {
protocol = "TCP/"
opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
channel.tcpPortIndex = 0
if conf.IsMediaNetworkTCP() {
protocol = "TCP/"
if conf.tcpPorts.Valid {
opt.MediaPort, err = publisher.ListenTCP()
if err != nil {
return 500, err
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = publisher.ListenUDP()
if err != nil {
code = 500
return
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
}
// if opt.MediaPort == 0 {
// opt.MediaPort = conf.MediaPort
// if conf.IsMediaNetworkTCP() {
// protocol = "TCP/"
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
// channel.tcpPortIndex = 0
// }
// }
// }
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
@@ -287,9 +343,9 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
"y=" + opt.ssrc,
"",
}
// if config.IsMediaNetworkTCP() {
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
// }
if conf.IsMediaNetworkTCP() {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
}
invite := channel.CreateRequst(sip.INVITE)
contentType := sip.ContentType("application/sdp")
invite.AppendHeader(&contentType)
@@ -300,14 +356,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
}
invite.AppendHeader(&subject)
response, err := d.SipRequestForResponse(invite)
if response == nil || err != nil {
return http.StatusRequestTimeout
publisher.inviteRes, err = d.SipRequestForResponse(invite)
if err != nil {
return http.StatusRequestTimeout, err
}
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, response.StatusCode()))
code = int(response.StatusCode())
code = int(publisher.inviteRes.StatusCode())
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
if code == 200 {
ds := strings.Split(response.Body(), "\r\n")
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
if ls[0] == "y" && len(ls[1]) > 0 {
@@ -320,21 +376,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
}
}
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
publisher := &GBPublisher{
InviteOptions: opt,
channel: channel,
inviteRes: &response,
}
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
publisher.udpCache = utils.NewPqRtp()
}
if plugin.Publish(streamPath, publisher) != nil {
return 403
if err = plugin.Publish(streamPath, publisher); err != nil {
code = 403
return
}
ack := sip.NewAckRequest("", invite, response, "", nil)
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
srv.Send(ack)
} else if opt.IsLive() && conf.AutoInvite {
time.AfterFunc(time.Second*5, func() {
@@ -345,6 +394,11 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
}
func (channel *Channel) Bye(live bool) int {
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
if s := Streams.Get(streamPath); s != nil {
s.Close()
}
if live && channel.LivePublisher != nil {
return channel.LivePublisher.Bye()
}

View File

@@ -233,10 +233,13 @@ func (d *Device) UpdateChannels(list []*Channel) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
old.Copy(c)
c = old
} else {
c.ChannelEx = &ChannelEx{
device: d,
}
d.channelMap[c.DeviceID] = c
}
if conf.AutoInvite && (c.LivePublisher == nil) {
go c.Invite(InviteOptions{})
@@ -246,7 +249,6 @@ func (d *Device) UpdateChannels(list []*Channel) {
} else {
c.LiveSubSP = ""
}
d.channelMap[c.DeviceID] = c
}
}
func (d *Device) UpdateRecord(channelId string, list []*Record) {
@@ -456,7 +458,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := Channel{
channel := &Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -471,11 +473,12 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addChannel(&channel)
channels := []*Channel{channel}
d.UpdateChannels(channels)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
delete(d.channelMap, v.DeviceID)
d.channelOffline(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
// 更新通道

View File

@@ -52,7 +52,9 @@ func (c *GB28181Config) initRoutes() {
tempIps := myip.LocalAndInternalIPs()
for k, v := range tempIps {
c.routes[k] = v
c.routes[k[0:strings.LastIndex(k, ".")]] = k
if lastdot := strings.LastIndex(k, "."); lastdot >= 0 {
c.routes[k[0:lastdot]] = k
}
}
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
}

View File

@@ -1,17 +1,18 @@
package gb28181
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"sync/atomic"
"time"
"github.com/ghettovoice/gosip/sip"
"github.com/pion/rtp/v2"
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
@@ -21,13 +22,14 @@ type GBPublisher struct {
Publisher
InviteOptions
channel *Channel
inviteRes *sip.Response
inviteRes sip.Response
parser *utils.DecPSPackage
lastSeq uint16
udpCache *utils.PriorityQueueRtp
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
reorder util.RTPReorder[*rtp.Packet]
}
func (p *GBPublisher) PrintDump(s string) {
@@ -41,7 +43,7 @@ func (p *GBPublisher) OnEvent(event any) {
p.IO.OnEvent(event)
return
}
switch v := event.(type) {
switch event.(type) {
case IPublisher:
if p.IsLive() {
p.Type = "GB28181 Live"
@@ -56,35 +58,31 @@ func (p *GBPublisher) OnEvent(event any) {
p.Error("open dump file failed", zap.Error(err))
}
}
if p.Equal(v) { //第一任
} else {
//删除前任
conf.publishers.Delete(v.(*GBPublisher).SSRC)
p.Publisher.OnEvent(v)
}
case SEwaitPublish:
//掉线自动重新拉流
if p.IsLive() {
atomic.StoreInt32(&p.channel.state, 0)
p.channel.LivePublisher = nil
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
p.channel.LivePublisher = nil
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
} else {
p.channel.RecordPublisher = nil
}
p.Publisher.OnEvent(v)
conf.publishers.Delete(p.SSRC)
if p.dumpFile != nil {
p.dumpFile.Close()
}
p.Bye()
default:
p.Publisher.OnEvent(v)
}
p.Publisher.OnEvent(event)
}
func (p *GBPublisher) Bye() int {
@@ -93,12 +91,11 @@ func (p *GBPublisher) Bye() int {
return 404
}
defer p.Stop()
defer atomic.StoreInt32(&p.channel.state, 0)
p.inviteRes = nil
bye := p.channel.CreateRequst(sip.BYE)
from, _ := (*res).From()
to, _ := (*res).To()
callId, _ := (*res).CallID()
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
@@ -113,87 +110,86 @@ func (p *GBPublisher) Bye() int {
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
if p.VideoTrack == nil {
switch p.parser.VideoStreamType {
case utils.StreamTypeH264:
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream)
case utils.StreamTypeH265:
case mpegts.STREAM_TYPE_H265:
p.VideoTrack = NewH265(p.Publisher.Stream)
default:
return
//推测编码类型
var maybe264 H264NALUType
maybe264 = maybe264.Parse(payload[4])
switch maybe264 {
case NALU_Non_IDR_Picture,
NALU_IDR_Picture,
NALU_SEI,
NALU_SPS,
NALU_PPS,
NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
p.VideoTrack = NewH265(p.Publisher.Stream)
}
}
}
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
if len(payload) > 10 {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
} else {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
}
if dts == 0 {
dts = pts
}
p.VideoTrack.WriteAnnexB(pts, dts, payload)
}
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
if p.AudioTrack == nil {
switch p.parser.AudioStreamType {
case utils.G711A:
case mpegts.STREAM_TYPE_G711A:
at := NewG711(p.Publisher.Stream, true)
at.Audio.SampleRate = 8000
at.Audio.SampleSize = 16
at.Channels = 1
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
p.AudioTrack = at
case utils.G711A + 1:
case mpegts.STREAM_TYPE_G711U:
at := NewG711(p.Publisher.Stream, false)
at.Audio.SampleRate = 8000
at.Audio.SampleSize = 16
at.Channels = 1
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
p.AudioTrack = at
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(payload[:7])
default:
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
return
}
} else {
p.AudioTrack.WriteRaw(ts, payload)
}
p.AudioTrack.WriteAVCC(ts, payload)
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
originRtp := *rtp
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
//序号小于第一个包的丢弃,rtp包序号达到65535后会从0开始所以这里需要判断一下
if rtp.SequenceNumber < p.lastSeq && p.lastSeq-rtp.SequenceNumber < utils.MaxRtpDiff {
return
}
p.udpCache.Push(*rtp)
rtpTmp, _ := p.udpCache.Pop()
rtp = &rtpTmp
}
ps := rtp.Payload
if p.lastSeq != 0 {
// rtp序号不连续丢弃PS
if p.lastSeq+1 != rtp.SequenceNumber {
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
if p.udpCache.Len() < conf.UdpCacheSize {
p.udpCache.Push(*rtp)
return
} else {
p.udpCache.Empty()
rtp = &originRtp // 还原rtp包而不是使用缓存中避免rtp序号断裂
}
}
p.parser.Reset()
}
}
p.lastSeq = rtp.SequenceNumber
if p.parser == nil {
p.parser = new(utils.DecPSPackage)
p.parser = utils.NewDecPSPackage(p)
}
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
if p.parser.Len() > 0 {
p.parser.Skip(4)
p.PrintDump("</td></tr>")
p.PrintDump("<tr>")
p.parser.Read(rtp.Timestamp, p)
p.PrintDump("</tr>")
p.PrintDump("<tr class=gray><td colspan=12>")
p.parser.Reset()
if conf.IsMediaNetworkTCP() {
p.parser.Feed(rtp)
p.lastSeq = rtp.SequenceNumber
} else {
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
if rtp.SequenceNumber != p.lastSeq+1 {
p.parser.Drop()
}
p.parser.Feed(rtp)
p.lastSeq = rtp.SequenceNumber
}
p.parser.Write(ps)
} else if p.parser.Len() > 0 {
p.parser.Write(ps)
}
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()
@@ -207,7 +203,6 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
p.PrintDump("<table>")
defer p.PrintDump("</table>")
}
p.PrintDump("<tr class=gray><td colspan=12>")
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
@@ -226,3 +221,79 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
}
return
}
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
var rtpPacket rtp.Packet
networkBuffer := 1048576
port, err = conf.udpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
conf.udpPorts.Recycle(port)
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
defer conn.Close()
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
plugin.Error("Decode rtp error:", zap.Error(err))
}
if p.dumpFile != nil {
util.PutBE(dumpLen[:4], n)
if p.lastReceive.IsZero() {
util.PutBE(dumpLen[4:], 0)
} else {
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
}
p.lastReceive = time.Now()
p.dumpFile.Write(dumpLen)
p.dumpFile.Write(ps)
}
p.PushPS(&rtpPacket)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
}
}()
return
}
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
port, err = conf.tcpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
defer conf.tcpPorts.Recycle(port)
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
go func() {
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
defer conf.tcpPorts.Recycle(port)
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
conn, err := listen.Accept()
listen.Close()
p.SetIO(conn)
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
return
}
processTcpMediaConn(conf, conn)
}()
return
}

View File

@@ -33,7 +33,7 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.QueryRecord(startTime, endTime))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
@@ -44,7 +44,7 @@ func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Control(ptzcmd))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
@@ -54,19 +54,18 @@ func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
channel := query.Get("channel")
port, _ := strconv.Atoi(query.Get("mediaPort"))
opt := InviteOptions{
query.Get("startTime"),
query.Get("endTime"),
query.Get("dump"),
"", 0, uint16(port),
dump: query.Get("dump"),
MediaPort: uint16(port),
}
if c := FindChannel(id, channel); c != nil {
if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else {
w.WriteHeader(c.Invite(opt))
}
opt.Validate(query.Get("startTime"), query.Get("endTime"))
if c := FindChannel(id, channel); c == nil {
http.NotFound(w, r)
} else if opt.IsLive() && c.LivePublisher != nil {
w.WriteHeader(304) //直播流已存在
} else if code, err := c.Invite(opt); err == nil {
w.WriteHeader(code)
} else {
w.WriteHeader(404)
http.Error(w, err.Error(), code)
}
}
@@ -111,7 +110,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
if c := FindChannel(id, channel); c != nil {
w.WriteHeader(c.Bye(live != "false"))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}
@@ -132,6 +131,6 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
d := v.(*Device)
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
} else {
w.WriteHeader(404)
http.NotFound(w, r)
}
}

View File

@@ -23,9 +23,55 @@ import (
var srv gosip.Server
type PortManager struct {
recycle chan uint16
max uint16
pos uint16
Valid bool
}
func (pm *PortManager) Init(start, end uint16) {
pm.pos = start
pm.max = end
if pm.pos > 0 && pm.max > pm.pos {
pm.Valid = true
pm.recycle = make(chan uint16, pm.Range())
}
}
func (pm *PortManager) Range() uint16 {
return pm.max - pm.pos
}
func (pm *PortManager) Recycle(p uint16) (err error) {
select {
case pm.recycle <- p:
return nil
default:
return io.EOF //TODO: 换一个Error
}
}
func (pm *PortManager) GetPort() (p uint16, err error) {
select {
case p = <-pm.recycle:
return
default:
if pm.Range() > 0 {
pm.pos++
p = pm.pos
return
} else {
return 0, io.EOF //TODO: 换一个Error
}
}
}
type Server struct {
Ignores map[string]struct{}
publishers util.Map[uint32, *GBPublisher]
tcpPorts PortManager
udpPorts PortManager
}
const MaxRegisterCount = 3
@@ -82,9 +128,15 @@ func (config *GB28181Config) startServer() {
func (config *GB28181Config) startMediaServer() {
if config.MediaNetwork == "tcp" {
listenMediaTCP(config)
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.tcpPorts.Valid {
config.listenMediaTCP()
}
} else {
listenMediaUDP(config)
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
if !config.udpPorts.Valid {
config.listenMediaUDP()
}
}
}
@@ -110,7 +162,7 @@ func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
}
}
func listenMediaTCP(config *GB28181Config) {
func (config *GB28181Config) listenMediaTCP() {
addr := ":" + strconv.Itoa(int(config.MediaPort))
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
@@ -132,7 +184,7 @@ func listenMediaTCP(config *GB28181Config) {
}
}
func listenMediaUDP(config *GB28181Config) {
func (config *GB28181Config) listenMediaUDP() {
var rtpPacket rtp.Packet
networkBuffer := 1048576

View File

@@ -60,6 +60,7 @@ func (b *IOBuffer) ReadByte() (byte, error) {
b.off++
return c, nil
}
func (b *IOBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
@@ -81,20 +82,24 @@ func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
var ErrTooLarge = errors.New("IOBuffer: too large")
func (b *IOBuffer) Write(p []byte) (n int, err error) {
defer func() {
if recover() != nil {
panic(ErrTooLarge)
}
}()
l := len(p)
oldLen := len(b.buf)
m, ok := b.tryGrowByReslice(l)
if !ok {
buf := make([]byte, oldLen+l)
copy(buf, b.buf[b.off:])
m = oldLen - b.off
b.off = 0
b.buf = buf
}
return copy(b.buf[m:], p), nil
l := copy(b.buf, b.buf[b.off:])
b.buf = append(b.buf[:l], p...)
b.off = 0
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
return len(p), nil
// defer func() {
// if recover() != nil {
// panic(ErrTooLarge)
// }
// }()
// l := len(p)
// oldLen := len(b.buf)
// m, ok := b.tryGrowByReslice(l)
// if !ok {
// m = oldLen - b.off
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
// b.off = 0
// b.buf = buf
// }
// return copy(b.buf[m:], p), nil
}

View File

@@ -4,31 +4,23 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/pion/rtp/v2"
)
//
const (
UDPTransfer int = 0
TCPTransferActive int = 1
TCPTransferPassive int = 2
LocalCache int = 3
StreamTypeH264 = 0x1b
StreamTypeH265 = 0x24
G711A = 0x90 //PCMA
G7221AUDIOTYPE = 0x92
G7231AUDIOTYPE = 0x93
G729AUDIOTYPE = 0x99
StreamIDVideo = 0xe0
StreamIDAudio = 0xc0
StartCodePS = 0x000001ba
StartCodeSYS = 0x000001bb
StartCodeMAP = 0x000001bc
StartCodeVideo = 0x000001e0
StartCodeAudio = 0x000001c0
HaiKangCode = 0x000001bd
PrivateStreamCode = 0x000001bd
MEPGProgramEndCode = 0x000001b9
RTPHeaderLength int = 12
@@ -119,11 +111,21 @@ type DecPSPackage struct {
VideoStreamType uint32
AudioStreamType uint32
IOBuffer
Payload []byte
PTS uint32
DTS uint32
Payload []byte
Lack int //缺少字节数
videoBuffer []byte
audioBuffer []byte
PTS uint32
DTS uint32
Pusher
}
func NewDecPSPackage(p Pusher) *DecPSPackage {
p.PrintDump("<tr><td>")
return &DecPSPackage{
Pusher: p,
}
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
@@ -138,116 +140,139 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
if err != nil {
return
}
return dec.ReadN(int(payloadlen))
if l := int(payloadlen); dec.Len() >= l {
dec.Lack = 0
return dec.Next(l), nil
} else {
dec.Lack = l - dec.Len()
}
return dec.Next(dec.Len()), io.EOF
}
//read the buffer and push video or audio
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
dec.clean()
dec.PTS = ts
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
if err := dec.Skip(9); err != nil {
return err
}
// Drop 由于丢包引起的必须丢弃的数据
func (dec *DecPSPackage) Drop() {
dec.Reset()
dec.videoBuffer = nil
dec.audioBuffer = nil
dec.Payload = nil
}
psl, err := dec.ReadByte()
if err != nil {
return err
func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
ps := rtp.Payload
if len(ps) < 4 {
return nil
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
var video []byte
var nextStartCode uint32
pusher.PrintDump("<td>")
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
break
switch binary.BigEndian.Uint32(ps) {
case StartCodePS, StartCodeSYS, StartCodeMAP, StartCodeVideo, StartCodeAudio, PrivateStreamCode, MEPGProgramEndCode:
defer dec.Write(ps)
if dec.Len() >= 4 {
//说明需要处理PS包处理完后清空缓存
defer dec.Reset()
} else {
return
}
switch nextStartCode {
case StartCodeSYS:
pusher.PrintDump("[sys]")
dec.ReadPayload()
//err = dec.decSystemHeader()
case StartCodeMAP:
err = dec.decProgramStreamMap()
pusher.PrintDump("[map]")
case StartCodeVideo:
if err = dec.decPESPacket(); err == nil {
// if len(video) == 0 {
// if dec.PTS == 0 {
// dec.PTS = ts
// }
// // if dec.DTS == 0 {
// // dec.DTS = dec.PTS
// // }
// }
video = append(video, dec.Payload...)
} else {
fmt.Println("video", err)
default:
// 说明是中间数据,直接写入缓存,否则数据不合法需要丢弃
if dec.Len() > 0 {
dec.Write(ps)
}
return nil
}
for dec.Len() >= 4 {
code, _ := dec.Uint32()
// println("code:", code)
switch code {
case StartCodePS:
dec.PrintDump("</td></tr><tr><td>")
if len(dec.audioBuffer) > 0 {
dec.PushAudio(dec.PTS, dec.audioBuffer)
dec.audioBuffer = nil
}
pusher.PrintDump("[video]")
if err := dec.Skip(9); err != nil {
return err
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
if len(dec.videoBuffer) > 0 {
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
dec.videoBuffer = nil
}
case StartCodeSYS:
dec.PrintDump("</td><td>[sys]")
dec.ReadPayload()
case StartCodeMAP:
dec.decProgramStreamMap()
dec.PrintDump("</td><td>[map]")
case StartCodeVideo:
if dec.videoBuffer == nil {
dec.PrintDump("</td><td>")
}
err = dec.decPESPacket()
// if err != nil {
//说明还有后续数据,需要继续处理
// println(rtp.SequenceNumber)
// }
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
dec.PrintDump("[video]")
case StartCodeAudio:
if dec.audioBuffer == nil {
dec.PrintDump("</td><td>")
}
if err = dec.decPESPacket(); err == nil {
ts := ts / 90
if dec.PTS != 0 {
ts = dec.PTS / 90
}
pusher.PushAudio(ts, dec.Payload)
pusher.PrintDump("[audio]")
dec.audioBuffer = append(dec.audioBuffer, dec.Payload...)
dec.PrintDump("[audio]")
} else {
fmt.Println("audio", err)
}
case StartCodePS:
break loop
default:
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
case PrivateStreamCode:
dec.ReadPayload()
dec.PrintDump("</td></tr><tr><td>[ac3]")
case MEPGProgramEndCode:
dec.PrintDump("</td></tr>")
return io.EOF
default:
fmt.Println("unknow code", code)
return ErrParsePakcet
}
}
if len(video) > 0 {
pusher.PrintDump("</td>")
pusher.PushVideo(dec.PTS, dec.DTS, video)
video = nil
}
if nextStartCode == StartCodePS {
// fmt.Println(aurora.Red("StartCodePS recursion..."), err)
return dec.Read(ts, pusher)
}
return err
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
*/
func (dec *DecPSPackage) decProgramStreamMap() error {
psm, err := dec.ReadPayload()
@@ -287,9 +312,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
func (dec *DecPSPackage) decPESPacket() error {
payload, err := dec.ReadPayload()
if err != nil {
return err
}
if len(payload) < 4 {
return errors.New("not enough data")
}