mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
940d7c5e59 | ||
|
|
2142a474a3 | ||
|
|
86e9bccb85 | ||
|
|
bb3a679a60 | ||
|
|
ecd97c8439 | ||
|
|
bfd71a72d8 | ||
|
|
c6bef8ccd8 | ||
|
|
20c0ac52cb | ||
|
|
34f5b7da79 | ||
|
|
0d3a795dc2 | ||
|
|
b52d457990 | ||
|
|
4a214cebeb | ||
|
|
8f78f992ca | ||
|
|
228d7b0cd2 | ||
|
|
e99150b0be | ||
|
|
31112e0052 | ||
|
|
8663b8e171 | ||
|
|
5960f07fc3 | ||
|
|
eb6004d6ef | ||
|
|
cce5f67ab9 | ||
|
|
fdfb462d46 | ||
|
|
c05adce562 | ||
|
|
aa3727f582 | ||
|
|
6e8709176e | ||
|
|
3e6c43f6ff |
14
README.md
14
README.md
@@ -22,7 +22,7 @@ gb28181:
|
||||
prefetchrecord: false
|
||||
udpcachesize: 0
|
||||
sipnetwork: udp
|
||||
sipip:
|
||||
sipip: ""
|
||||
sipport: 5060
|
||||
serial: "34020000002000000001"
|
||||
realm: "3402000000"
|
||||
@@ -35,14 +35,15 @@ gb28181:
|
||||
heartbeatinterval: 60
|
||||
heartbeatretry: 3
|
||||
|
||||
mediaip:
|
||||
mediaip: ""
|
||||
mediaport: 58200
|
||||
mediaidletimeout: 30
|
||||
medianetwork: udp
|
||||
|
||||
mediaportmin: 0
|
||||
meidaportmax: 0
|
||||
|
||||
removebaninterval: 600
|
||||
loglevel: info
|
||||
audioenable: true
|
||||
```
|
||||
|
||||
- `AutoInvite` bool 表示自动发起invite,当Server(SIP)接收到设备信息时,立即向设备发送invite命令获取流
|
||||
@@ -68,12 +69,15 @@ gb28181:
|
||||
- `MediaPort` uint16 媒体服务器端口
|
||||
- `MediaNetwork` string 媒体传输协议,默认UDP,可选TCP
|
||||
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
- `MediaPortMin` uint16 媒体服务器端口范围最小值
|
||||
- `MediaPortMax` uint16 媒体服务器端口范围最大值
|
||||
- `AudioEnable` bool 是否开启音频
|
||||
- `LogLevel` string 日志级别,默认 info(trace,debug,info,warn,error,fatal, panic)
|
||||
- `RemoveBanInterval` int 定时移除注册失败的设备黑名单,单位秒,默认10分钟(600秒)
|
||||
- `UdpCacheSize` int 表示UDP缓存大小,默认为0,不开启。仅当TCP关闭,切缓存大于0时才开启,会最多缓存最多N个包,并排序,修复乱序造成的无法播放问题,注意开启后,会有一定的性能损耗,并丢失部分包。
|
||||
|
||||
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
|
||||
|
||||
**注意某些摄像机没有设置用户名的地方,摄像机会以自身的国标id作为用户名,这个时候m7s会忽略使用摄像机的用户名,忽略配置的用户名**
|
||||
如果设备配置了错误的用户名和密码,连续三次上报错误后,m7s会记录设备id,并在10分钟内禁止设备注册
|
||||
|
||||
|
||||
160
channel.go
160
channel.go
@@ -1,14 +1,16 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "m7s.live/engine/v4"
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -24,7 +26,7 @@ type ChannelEx struct {
|
||||
RecordEndTime string
|
||||
recordStartTime time.Time
|
||||
recordEndTime time.Time
|
||||
state int32
|
||||
liveInviteLock sync.Mutex
|
||||
tcpPortIndex uint16
|
||||
GpsTime time.Time //gps时间
|
||||
Longitude string //经度
|
||||
@@ -50,6 +52,28 @@ type Channel struct {
|
||||
*ChannelEx //自定义属性
|
||||
}
|
||||
|
||||
func (c *Channel) Copy(v *Channel) {
|
||||
if v == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.DeviceID = v.DeviceID
|
||||
c.ParentID = v.ParentID
|
||||
c.Name = v.Name
|
||||
c.Manufacturer = v.Manufacturer
|
||||
c.Model = v.Model
|
||||
c.Owner = v.Owner
|
||||
c.CivilCode = v.CivilCode
|
||||
c.Address = v.Address
|
||||
c.Parental = v.Parental
|
||||
c.SafetyWay = v.SafetyWay
|
||||
c.RegisterWay = v.RegisterWay
|
||||
c.Secrecy = v.Secrecy
|
||||
c.Status = v.Status
|
||||
c.Status = v.Status
|
||||
c.ChannelEx = v.ChannelEx
|
||||
}
|
||||
|
||||
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
|
||||
d := c.device
|
||||
d.sn++
|
||||
@@ -114,7 +138,7 @@ func (channel *Channel) QueryRecord(startTime, endTime string) int {
|
||||
<EndTime>%s</EndTime>
|
||||
<Secrecy>0</Secrecy>
|
||||
<Type>all</Type>
|
||||
</Query>`, d.sn, d.ID, startTime, endTime)
|
||||
</Query>`, d.sn, channel.DeviceID, startTime, endTime)
|
||||
request.SetBody(body, true)
|
||||
resp, err := d.SipRequestForResponse(request)
|
||||
if err != nil {
|
||||
@@ -143,8 +167,8 @@ func (channel *Channel) Control(PTZCmd string) int {
|
||||
}
|
||||
|
||||
type InviteOptions struct {
|
||||
Start string
|
||||
End string
|
||||
Start int
|
||||
End int
|
||||
dump string
|
||||
ssrc string
|
||||
SSRC uint32
|
||||
@@ -152,27 +176,36 @@ type InviteOptions struct {
|
||||
}
|
||||
|
||||
func (o InviteOptions) IsLive() bool {
|
||||
return o.Start == ""
|
||||
return o.Start == 0 || o.End == 0
|
||||
}
|
||||
|
||||
func (o InviteOptions) Record() bool {
|
||||
return o.Start != ""
|
||||
return !o.IsLive()
|
||||
}
|
||||
|
||||
func (o InviteOptions) Validate() bool {
|
||||
sint, err1 := strconv.ParseInt(o.Start, 10, 0)
|
||||
eint, err2 := strconv.ParseInt(o.End, 10, 0)
|
||||
if err1 != nil || err2 != nil {
|
||||
return false
|
||||
func (o *InviteOptions) Validate(start, end string) error {
|
||||
if start != "" {
|
||||
sint, err1 := strconv.ParseInt(start, 10, 0)
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
o.Start = int(sint)
|
||||
}
|
||||
if sint >= eint {
|
||||
return false
|
||||
if end != "" {
|
||||
eint, err2 := strconv.ParseInt(end, 10, 0)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
o.End = int(eint)
|
||||
}
|
||||
return true
|
||||
if o.Start >= o.End {
|
||||
return errors.New("start < end")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o InviteOptions) String() string {
|
||||
return fmt.Sprintf("t=%s %s", o.Start, o.End)
|
||||
return fmt.Sprintf("t=%d %d", o.Start, o.End)
|
||||
}
|
||||
|
||||
func (o *InviteOptions) CreateSSRC() {
|
||||
@@ -234,14 +267,14 @@ f = v/a/编码格式/码率大小/采样率
|
||||
f字段中视、音频参数段之间不需空格分割。
|
||||
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
|
||||
*/
|
||||
func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if opt.IsLive() {
|
||||
if !atomic.CompareAndSwapInt32(&channel.state, 0, 1) {
|
||||
return 304
|
||||
if !channel.liveInviteLock.TryLock() {
|
||||
return 304, nil
|
||||
}
|
||||
defer func() {
|
||||
if code != 200 {
|
||||
atomic.StoreInt32(&channel.state, 0)
|
||||
channel.liveInviteLock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -251,29 +284,52 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
s := "Play"
|
||||
opt.CreateSSRC()
|
||||
if opt.Record() {
|
||||
if !opt.Validate() {
|
||||
return 400
|
||||
}
|
||||
s = "Playback"
|
||||
streamPath = fmt.Sprintf("%s/%s/%s-%s", d.ID, channel.DeviceID, opt.Start, opt.End)
|
||||
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
|
||||
}
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
|
||||
// size := 1
|
||||
// fps := 15
|
||||
// bitrate := 200
|
||||
// fmt.Sprintf("f=v/2/%d/%d/1/%da///", size, fps, bitrate)
|
||||
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
}
|
||||
protocol := ""
|
||||
if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
protocol = "TCP/"
|
||||
opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
|
||||
if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
channel.tcpPortIndex = 0
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
protocol = "TCP/"
|
||||
if conf.tcpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenTCP()
|
||||
if err != nil {
|
||||
return 500, err
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
} else {
|
||||
if conf.udpPorts.Valid {
|
||||
opt.MediaPort, err = publisher.ListenUDP()
|
||||
if err != nil {
|
||||
code = 500
|
||||
return
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
}
|
||||
// if opt.MediaPort == 0 {
|
||||
// opt.MediaPort = conf.MediaPort
|
||||
// if conf.IsMediaNetworkTCP() {
|
||||
// protocol = "TCP/"
|
||||
// opt.MediaPort = conf.MediaPort + channel.tcpPortIndex
|
||||
// if channel.tcpPortIndex++; channel.tcpPortIndex >= conf.MediaPortMax {
|
||||
// channel.tcpPortIndex = 0
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
sdpInfo := []string{
|
||||
"v=0",
|
||||
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.mediaIP),
|
||||
@@ -287,9 +343,9 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
"y=" + opt.ssrc,
|
||||
"",
|
||||
}
|
||||
// if config.IsMediaNetworkTCP() {
|
||||
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
// }
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
}
|
||||
invite := channel.CreateRequst(sip.INVITE)
|
||||
contentType := sip.ContentType("application/sdp")
|
||||
invite.AppendHeader(&contentType)
|
||||
@@ -300,14 +356,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
|
||||
}
|
||||
invite.AppendHeader(&subject)
|
||||
response, err := d.SipRequestForResponse(invite)
|
||||
if response == nil || err != nil {
|
||||
return http.StatusRequestTimeout
|
||||
publisher.inviteRes, err = d.SipRequestForResponse(invite)
|
||||
if err != nil {
|
||||
return http.StatusRequestTimeout, err
|
||||
}
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, response.StatusCode()))
|
||||
code = int(response.StatusCode())
|
||||
code = int(publisher.inviteRes.StatusCode())
|
||||
plugin.Info(fmt.Sprintf("Channel :%s invite response status code: %d", channel.DeviceID, code))
|
||||
if code == 200 {
|
||||
ds := strings.Split(response.Body(), "\r\n")
|
||||
ds := strings.Split(publisher.inviteRes.Body(), "\r\n")
|
||||
for _, l := range ds {
|
||||
if ls := strings.Split(l, "="); len(ls) > 1 {
|
||||
if ls[0] == "y" && len(ls[1]) > 0 {
|
||||
@@ -320,21 +376,14 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if opt.dump == "" {
|
||||
opt.dump = conf.DumpPath
|
||||
}
|
||||
publisher := &GBPublisher{
|
||||
InviteOptions: opt,
|
||||
channel: channel,
|
||||
inviteRes: &response,
|
||||
}
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
publisher.udpCache = utils.NewPqRtp()
|
||||
}
|
||||
if plugin.Publish(streamPath, publisher) != nil {
|
||||
return 403
|
||||
if err = plugin.Publish(streamPath, publisher); err != nil {
|
||||
code = 403
|
||||
return
|
||||
}
|
||||
ack := sip.NewAckRequest("", invite, response, "", nil)
|
||||
ack := sip.NewAckRequest("", invite, publisher.inviteRes, "", nil)
|
||||
srv.Send(ack)
|
||||
} else if opt.IsLive() && conf.AutoInvite {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
@@ -345,6 +394,11 @@ func (channel *Channel) Invite(opt InviteOptions) (code int) {
|
||||
}
|
||||
|
||||
func (channel *Channel) Bye(live bool) int {
|
||||
d := channel.device
|
||||
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Close()
|
||||
}
|
||||
if live && channel.LivePublisher != nil {
|
||||
return channel.LivePublisher.Bye()
|
||||
}
|
||||
|
||||
11
device.go
11
device.go
@@ -233,10 +233,13 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
|
||||
}
|
||||
}
|
||||
old.Copy(c)
|
||||
c = old
|
||||
} else {
|
||||
c.ChannelEx = &ChannelEx{
|
||||
device: d,
|
||||
}
|
||||
d.channelMap[c.DeviceID] = c
|
||||
}
|
||||
if conf.AutoInvite && (c.LivePublisher == nil) {
|
||||
go c.Invite(InviteOptions{})
|
||||
@@ -246,7 +249,6 @@ func (d *Device) UpdateChannels(list []*Channel) {
|
||||
} else {
|
||||
c.LiveSubSP = ""
|
||||
}
|
||||
d.channelMap[c.DeviceID] = c
|
||||
}
|
||||
}
|
||||
func (d *Device) UpdateRecord(channelId string, list []*Record) {
|
||||
@@ -456,7 +458,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
d.channelOffline(v.DeviceID)
|
||||
case "ADD":
|
||||
plugin.Debug("收到通道新增通知")
|
||||
channel := Channel{
|
||||
channel := &Channel{
|
||||
DeviceID: v.DeviceID,
|
||||
ParentID: v.ParentID,
|
||||
Name: v.Name,
|
||||
@@ -471,11 +473,12 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
|
||||
Secrecy: v.Secrecy,
|
||||
Status: v.Status,
|
||||
}
|
||||
d.addChannel(&channel)
|
||||
channels := []*Channel{channel}
|
||||
d.UpdateChannels(channels)
|
||||
case "DEL":
|
||||
//删除
|
||||
plugin.Debug("收到通道删除通知")
|
||||
delete(d.channelMap, v.DeviceID)
|
||||
d.channelOffline(v.DeviceID)
|
||||
case "UPDATE":
|
||||
plugin.Debug("收到通道更新通知")
|
||||
// 更新通道
|
||||
|
||||
4
main.go
4
main.go
@@ -52,7 +52,9 @@ func (c *GB28181Config) initRoutes() {
|
||||
tempIps := myip.LocalAndInternalIPs()
|
||||
for k, v := range tempIps {
|
||||
c.routes[k] = v
|
||||
c.routes[k[0:strings.LastIndex(k, ".")]] = k
|
||||
if lastdot := strings.LastIndex(k, "."); lastdot >= 0 {
|
||||
c.routes[k[0:lastdot]] = k
|
||||
}
|
||||
}
|
||||
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
|
||||
}
|
||||
|
||||
209
publisher.go
209
publisher.go
@@ -1,17 +1,18 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"github.com/pion/rtp/v2"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -21,13 +22,14 @@ type GBPublisher struct {
|
||||
Publisher
|
||||
InviteOptions
|
||||
channel *Channel
|
||||
inviteRes *sip.Response
|
||||
inviteRes sip.Response
|
||||
parser *utils.DecPSPackage
|
||||
lastSeq uint16
|
||||
udpCache *utils.PriorityQueueRtp
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
reorder util.RTPReorder[*rtp.Packet]
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
@@ -41,7 +43,7 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.IO.OnEvent(event)
|
||||
return
|
||||
}
|
||||
switch v := event.(type) {
|
||||
switch event.(type) {
|
||||
case IPublisher:
|
||||
if p.IsLive() {
|
||||
p.Type = "GB28181 Live"
|
||||
@@ -56,35 +58,31 @@ func (p *GBPublisher) OnEvent(event any) {
|
||||
p.Error("open dump file failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if p.Equal(v) { //第一任
|
||||
|
||||
} else {
|
||||
//删除前任
|
||||
conf.publishers.Delete(v.(*GBPublisher).SSRC)
|
||||
p.Publisher.OnEvent(v)
|
||||
}
|
||||
case SEwaitPublish:
|
||||
//掉线自动重新拉流
|
||||
if p.IsLive() {
|
||||
atomic.StoreInt32(&p.channel.state, 0)
|
||||
p.channel.LivePublisher = nil
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
go p.channel.Invite(InviteOptions{})
|
||||
}
|
||||
case SEclose, SEKick:
|
||||
if p.IsLive() {
|
||||
p.channel.LivePublisher = nil
|
||||
if p.channel.LivePublisher != nil {
|
||||
p.channel.LivePublisher = nil
|
||||
p.channel.liveInviteLock.Unlock()
|
||||
}
|
||||
} else {
|
||||
p.channel.RecordPublisher = nil
|
||||
}
|
||||
p.Publisher.OnEvent(v)
|
||||
conf.publishers.Delete(p.SSRC)
|
||||
if p.dumpFile != nil {
|
||||
p.dumpFile.Close()
|
||||
}
|
||||
p.Bye()
|
||||
default:
|
||||
p.Publisher.OnEvent(v)
|
||||
}
|
||||
p.Publisher.OnEvent(event)
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Bye() int {
|
||||
@@ -93,12 +91,11 @@ func (p *GBPublisher) Bye() int {
|
||||
return 404
|
||||
}
|
||||
defer p.Stop()
|
||||
defer atomic.StoreInt32(&p.channel.state, 0)
|
||||
p.inviteRes = nil
|
||||
bye := p.channel.CreateRequst(sip.BYE)
|
||||
from, _ := (*res).From()
|
||||
to, _ := (*res).To()
|
||||
callId, _ := (*res).CallID()
|
||||
from, _ := res.From()
|
||||
to, _ := res.To()
|
||||
callId, _ := res.CallID()
|
||||
bye.ReplaceHeaders(from.Name(), []sip.Header{from})
|
||||
bye.ReplaceHeaders(to.Name(), []sip.Header{to})
|
||||
bye.ReplaceHeaders(callId.Name(), []sip.Header{callId})
|
||||
@@ -113,87 +110,86 @@ func (p *GBPublisher) Bye() int {
|
||||
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
if p.VideoTrack == nil {
|
||||
switch p.parser.VideoStreamType {
|
||||
case utils.StreamTypeH264:
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
case utils.StreamTypeH265:
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
default:
|
||||
return
|
||||
//推测编码类型
|
||||
var maybe264 H264NALUType
|
||||
maybe264 = maybe264.Parse(payload[4])
|
||||
switch maybe264 {
|
||||
case NALU_Non_IDR_Picture,
|
||||
NALU_IDR_Picture,
|
||||
NALU_SEI,
|
||||
NALU_SPS,
|
||||
NALU_PPS,
|
||||
NALU_Access_Unit_Delimiter:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
default:
|
||||
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
if len(payload) > 10 {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
} else {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
|
||||
}
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
p.VideoTrack.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
if p.AudioTrack == nil {
|
||||
switch p.parser.AudioStreamType {
|
||||
case utils.G711A:
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
at := NewG711(p.Publisher.Stream, true)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case utils.G711A + 1:
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
at := NewG711(p.Publisher.Stream, false)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(payload[:7])
|
||||
default:
|
||||
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
p.AudioTrack.WriteRaw(ts, payload)
|
||||
}
|
||||
p.AudioTrack.WriteAVCC(ts, payload)
|
||||
}
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
originRtp := *rtp
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
//序号小于第一个包的丢弃,rtp包序号达到65535后会从0开始,所以这里需要判断一下
|
||||
if rtp.SequenceNumber < p.lastSeq && p.lastSeq-rtp.SequenceNumber < utils.MaxRtpDiff {
|
||||
return
|
||||
}
|
||||
p.udpCache.Push(*rtp)
|
||||
rtpTmp, _ := p.udpCache.Pop()
|
||||
rtp = &rtpTmp
|
||||
}
|
||||
ps := rtp.Payload
|
||||
if p.lastSeq != 0 {
|
||||
// rtp序号不连续,丢弃PS
|
||||
if p.lastSeq+1 != rtp.SequenceNumber {
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
if p.udpCache.Len() < conf.UdpCacheSize {
|
||||
p.udpCache.Push(*rtp)
|
||||
return
|
||||
} else {
|
||||
p.udpCache.Empty()
|
||||
rtp = &originRtp // 还原rtp包,而不是使用缓存中,避免rtp序号断裂
|
||||
}
|
||||
}
|
||||
p.parser.Reset()
|
||||
}
|
||||
}
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
if p.parser == nil {
|
||||
p.parser = new(utils.DecPSPackage)
|
||||
p.parser = utils.NewDecPSPackage(p)
|
||||
}
|
||||
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
|
||||
if p.parser.Len() > 0 {
|
||||
p.parser.Skip(4)
|
||||
p.PrintDump("</td></tr>")
|
||||
p.PrintDump("<tr>")
|
||||
p.parser.Read(rtp.Timestamp, p)
|
||||
p.PrintDump("</tr>")
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
p.parser.Reset()
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
|
||||
if rtp.SequenceNumber != p.lastSeq+1 {
|
||||
p.parser.Drop()
|
||||
}
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
}
|
||||
p.parser.Write(ps)
|
||||
} else if p.parser.Len() > 0 {
|
||||
p.parser.Write(ps)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
@@ -207,7 +203,6 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
p.PrintDump("<table>")
|
||||
defer p.PrintDump("</table>")
|
||||
}
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
var t uint16
|
||||
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = f.Read(l)
|
||||
@@ -226,3 +221,79 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ListenUDP() (port uint16, err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
port, err = conf.udpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
if err != nil {
|
||||
conf.udpPorts.Recycle(port)
|
||||
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
p.SetIO(conn)
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", port))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
|
||||
defer conf.udpPorts.Recycle(port)
|
||||
dumpLen := make([]byte, 6)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
plugin.Error("Decode rtp error:", zap.Error(err))
|
||||
}
|
||||
if p.dumpFile != nil {
|
||||
util.PutBE(dumpLen[:4], n)
|
||||
if p.lastReceive.IsZero() {
|
||||
util.PutBE(dumpLen[4:], 0)
|
||||
} else {
|
||||
util.PutBE(dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
|
||||
}
|
||||
p.lastReceive = time.Now()
|
||||
p.dumpFile.Write(dumpLen)
|
||||
p.dumpFile.Write(ps)
|
||||
}
|
||||
p.PushPS(&rtpPacket)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
|
||||
port, err = conf.tcpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
if err != nil {
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
go func() {
|
||||
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
|
||||
conn, err := listen.Accept()
|
||||
listen.Close()
|
||||
p.SetIO(conn)
|
||||
if err != nil {
|
||||
plugin.Error("Accept err=", zap.Error(err))
|
||||
return
|
||||
}
|
||||
processTcpMediaConn(conf, conn)
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
29
restful.go
29
restful.go
@@ -33,7 +33,7 @@ func (conf *GB28181Config) API_records(w http.ResponseWriter, r *http.Request) {
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.QueryRecord(startTime, endTime))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ func (conf *GB28181Config) API_control(w http.ResponseWriter, r *http.Request) {
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Control(ptzcmd))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,19 +54,18 @@ func (conf *GB28181Config) API_invite(w http.ResponseWriter, r *http.Request) {
|
||||
channel := query.Get("channel")
|
||||
port, _ := strconv.Atoi(query.Get("mediaPort"))
|
||||
opt := InviteOptions{
|
||||
query.Get("startTime"),
|
||||
query.Get("endTime"),
|
||||
query.Get("dump"),
|
||||
"", 0, uint16(port),
|
||||
dump: query.Get("dump"),
|
||||
MediaPort: uint16(port),
|
||||
}
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
if opt.IsLive() && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else {
|
||||
w.WriteHeader(c.Invite(opt))
|
||||
}
|
||||
opt.Validate(query.Get("startTime"), query.Get("endTime"))
|
||||
if c := FindChannel(id, channel); c == nil {
|
||||
http.NotFound(w, r)
|
||||
} else if opt.IsLive() && c.LivePublisher != nil {
|
||||
w.WriteHeader(304) //直播流已存在
|
||||
} else if code, err := c.Invite(opt); err == nil {
|
||||
w.WriteHeader(code)
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.Error(w, err.Error(), code)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +110,7 @@ func (conf *GB28181Config) API_bye(w http.ResponseWriter, r *http.Request) {
|
||||
if c := FindChannel(id, channel); c != nil {
|
||||
w.WriteHeader(c.Bye(live != "false"))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,6 +131,6 @@ func (conf *GB28181Config) API_position(w http.ResponseWriter, r *http.Request)
|
||||
d := v.(*Device)
|
||||
w.WriteHeader(d.MobilePositionSubscribe(id, expiresInt, intervalInt))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
60
server.go
60
server.go
@@ -23,9 +23,55 @@ import (
|
||||
|
||||
var srv gosip.Server
|
||||
|
||||
type PortManager struct {
|
||||
recycle chan uint16
|
||||
max uint16
|
||||
pos uint16
|
||||
Valid bool
|
||||
}
|
||||
|
||||
func (pm *PortManager) Init(start, end uint16) {
|
||||
pm.pos = start
|
||||
pm.max = end
|
||||
if pm.pos > 0 && pm.max > pm.pos {
|
||||
pm.Valid = true
|
||||
pm.recycle = make(chan uint16, pm.Range())
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PortManager) Range() uint16 {
|
||||
return pm.max - pm.pos
|
||||
}
|
||||
|
||||
func (pm *PortManager) Recycle(p uint16) (err error) {
|
||||
select {
|
||||
case pm.recycle <- p:
|
||||
return nil
|
||||
default:
|
||||
return io.EOF //TODO: 换一个Error
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PortManager) GetPort() (p uint16, err error) {
|
||||
select {
|
||||
case p = <-pm.recycle:
|
||||
return
|
||||
default:
|
||||
if pm.Range() > 0 {
|
||||
pm.pos++
|
||||
p = pm.pos
|
||||
return
|
||||
} else {
|
||||
return 0, io.EOF //TODO: 换一个Error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
Ignores map[string]struct{}
|
||||
publishers util.Map[uint32, *GBPublisher]
|
||||
tcpPorts PortManager
|
||||
udpPorts PortManager
|
||||
}
|
||||
|
||||
const MaxRegisterCount = 3
|
||||
@@ -82,9 +128,15 @@ func (config *GB28181Config) startServer() {
|
||||
|
||||
func (config *GB28181Config) startMediaServer() {
|
||||
if config.MediaNetwork == "tcp" {
|
||||
listenMediaTCP(config)
|
||||
config.tcpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.tcpPorts.Valid {
|
||||
config.listenMediaTCP()
|
||||
}
|
||||
} else {
|
||||
listenMediaUDP(config)
|
||||
config.udpPorts.Init(config.MediaPortMin, config.MediaPortMax)
|
||||
if !config.udpPorts.Valid {
|
||||
config.listenMediaUDP()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +162,7 @@ func processTcpMediaConn(config *GB28181Config, conn net.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
func listenMediaTCP(config *GB28181Config) {
|
||||
func (config *GB28181Config) listenMediaTCP() {
|
||||
addr := ":" + strconv.Itoa(int(config.MediaPort))
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
@@ -132,7 +184,7 @@ func listenMediaTCP(config *GB28181Config) {
|
||||
}
|
||||
}
|
||||
|
||||
func listenMediaUDP(config *GB28181Config) {
|
||||
func (config *GB28181Config) listenMediaUDP() {
|
||||
var rtpPacket rtp.Packet
|
||||
networkBuffer := 1048576
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ func (b *IOBuffer) ReadByte() (byte, error) {
|
||||
b.off++
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Reset() {
|
||||
b.buf = b.buf[:0]
|
||||
b.off = 0
|
||||
@@ -81,20 +82,24 @@ func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
|
||||
var ErrTooLarge = errors.New("IOBuffer: too large")
|
||||
|
||||
func (b *IOBuffer) Write(p []byte) (n int, err error) {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
panic(ErrTooLarge)
|
||||
}
|
||||
}()
|
||||
l := len(p)
|
||||
oldLen := len(b.buf)
|
||||
m, ok := b.tryGrowByReslice(l)
|
||||
if !ok {
|
||||
buf := make([]byte, oldLen+l)
|
||||
copy(buf, b.buf[b.off:])
|
||||
m = oldLen - b.off
|
||||
b.off = 0
|
||||
b.buf = buf
|
||||
}
|
||||
return copy(b.buf[m:], p), nil
|
||||
l := copy(b.buf, b.buf[b.off:])
|
||||
b.buf = append(b.buf[:l], p...)
|
||||
b.off = 0
|
||||
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
|
||||
return len(p), nil
|
||||
// defer func() {
|
||||
// if recover() != nil {
|
||||
// panic(ErrTooLarge)
|
||||
// }
|
||||
// }()
|
||||
// l := len(p)
|
||||
// oldLen := len(b.buf)
|
||||
// m, ok := b.tryGrowByReslice(l)
|
||||
// if !ok {
|
||||
// m = oldLen - b.off
|
||||
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
|
||||
// b.off = 0
|
||||
// b.buf = buf
|
||||
// }
|
||||
// return copy(b.buf[m:], p), nil
|
||||
}
|
||||
|
||||
247
utils/ps.go
247
utils/ps.go
@@ -4,31 +4,23 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pion/rtp/v2"
|
||||
)
|
||||
|
||||
//
|
||||
const (
|
||||
UDPTransfer int = 0
|
||||
TCPTransferActive int = 1
|
||||
TCPTransferPassive int = 2
|
||||
LocalCache int = 3
|
||||
|
||||
StreamTypeH264 = 0x1b
|
||||
StreamTypeH265 = 0x24
|
||||
G711A = 0x90 //PCMA
|
||||
G7221AUDIOTYPE = 0x92
|
||||
G7231AUDIOTYPE = 0x93
|
||||
G729AUDIOTYPE = 0x99
|
||||
|
||||
StreamIDVideo = 0xe0
|
||||
StreamIDAudio = 0xc0
|
||||
|
||||
StartCodePS = 0x000001ba
|
||||
StartCodeSYS = 0x000001bb
|
||||
StartCodeMAP = 0x000001bc
|
||||
StartCodeVideo = 0x000001e0
|
||||
StartCodeAudio = 0x000001c0
|
||||
HaiKangCode = 0x000001bd
|
||||
PrivateStreamCode = 0x000001bd
|
||||
MEPGProgramEndCode = 0x000001b9
|
||||
|
||||
RTPHeaderLength int = 12
|
||||
@@ -119,11 +111,21 @@ type DecPSPackage struct {
|
||||
VideoStreamType uint32
|
||||
AudioStreamType uint32
|
||||
IOBuffer
|
||||
Payload []byte
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
Payload []byte
|
||||
Lack int //缺少字节数
|
||||
videoBuffer []byte
|
||||
audioBuffer []byte
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
Pusher
|
||||
}
|
||||
|
||||
func NewDecPSPackage(p Pusher) *DecPSPackage {
|
||||
p.PrintDump("<tr><td>")
|
||||
return &DecPSPackage{
|
||||
Pusher: p,
|
||||
}
|
||||
}
|
||||
func (dec *DecPSPackage) clean() {
|
||||
dec.systemClockReferenceBase = 0
|
||||
dec.systemClockReferenceExtension = 0
|
||||
@@ -138,116 +140,139 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return dec.ReadN(int(payloadlen))
|
||||
if l := int(payloadlen); dec.Len() >= l {
|
||||
dec.Lack = 0
|
||||
return dec.Next(l), nil
|
||||
} else {
|
||||
dec.Lack = l - dec.Len()
|
||||
}
|
||||
return dec.Next(dec.Len()), io.EOF
|
||||
}
|
||||
|
||||
//read the buffer and push video or audio
|
||||
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
|
||||
dec.clean()
|
||||
dec.PTS = ts
|
||||
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
// Drop 由于丢包引起的必须丢弃的数据
|
||||
func (dec *DecPSPackage) Drop() {
|
||||
dec.Reset()
|
||||
dec.videoBuffer = nil
|
||||
dec.audioBuffer = nil
|
||||
dec.Payload = nil
|
||||
}
|
||||
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
|
||||
ps := rtp.Payload
|
||||
if len(ps) < 4 {
|
||||
return nil
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
}
|
||||
var video []byte
|
||||
var nextStartCode uint32
|
||||
pusher.PrintDump("<td>")
|
||||
loop:
|
||||
for err == nil {
|
||||
if nextStartCode, err = dec.Uint32(); err != nil {
|
||||
break
|
||||
switch binary.BigEndian.Uint32(ps) {
|
||||
case StartCodePS, StartCodeSYS, StartCodeMAP, StartCodeVideo, StartCodeAudio, PrivateStreamCode, MEPGProgramEndCode:
|
||||
defer dec.Write(ps)
|
||||
if dec.Len() >= 4 {
|
||||
//说明需要处理PS包,处理完后,清空缓存
|
||||
defer dec.Reset()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
switch nextStartCode {
|
||||
case StartCodeSYS:
|
||||
pusher.PrintDump("[sys]")
|
||||
dec.ReadPayload()
|
||||
//err = dec.decSystemHeader()
|
||||
case StartCodeMAP:
|
||||
err = dec.decProgramStreamMap()
|
||||
pusher.PrintDump("[map]")
|
||||
case StartCodeVideo:
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
// if len(video) == 0 {
|
||||
// if dec.PTS == 0 {
|
||||
// dec.PTS = ts
|
||||
// }
|
||||
// // if dec.DTS == 0 {
|
||||
// // dec.DTS = dec.PTS
|
||||
// // }
|
||||
// }
|
||||
video = append(video, dec.Payload...)
|
||||
} else {
|
||||
fmt.Println("video", err)
|
||||
default:
|
||||
// 说明是中间数据,直接写入缓存,否则数据不合法需要丢弃
|
||||
if dec.Len() > 0 {
|
||||
dec.Write(ps)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for dec.Len() >= 4 {
|
||||
code, _ := dec.Uint32()
|
||||
// println("code:", code)
|
||||
switch code {
|
||||
case StartCodePS:
|
||||
dec.PrintDump("</td></tr><tr><td>")
|
||||
if len(dec.audioBuffer) > 0 {
|
||||
dec.PushAudio(dec.PTS, dec.audioBuffer)
|
||||
dec.audioBuffer = nil
|
||||
}
|
||||
pusher.PrintDump("[video]")
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(dec.videoBuffer) > 0 {
|
||||
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
|
||||
dec.videoBuffer = nil
|
||||
}
|
||||
case StartCodeSYS:
|
||||
dec.PrintDump("</td><td>[sys]")
|
||||
dec.ReadPayload()
|
||||
case StartCodeMAP:
|
||||
dec.decProgramStreamMap()
|
||||
dec.PrintDump("</td><td>[map]")
|
||||
case StartCodeVideo:
|
||||
if dec.videoBuffer == nil {
|
||||
dec.PrintDump("</td><td>")
|
||||
}
|
||||
err = dec.decPESPacket()
|
||||
// if err != nil {
|
||||
//说明还有后续数据,需要继续处理
|
||||
// println(rtp.SequenceNumber)
|
||||
// }
|
||||
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
|
||||
dec.PrintDump("[video]")
|
||||
case StartCodeAudio:
|
||||
if dec.audioBuffer == nil {
|
||||
dec.PrintDump("</td><td>")
|
||||
}
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
ts := ts / 90
|
||||
if dec.PTS != 0 {
|
||||
ts = dec.PTS / 90
|
||||
}
|
||||
pusher.PushAudio(ts, dec.Payload)
|
||||
pusher.PrintDump("[audio]")
|
||||
dec.audioBuffer = append(dec.audioBuffer, dec.Payload...)
|
||||
dec.PrintDump("[audio]")
|
||||
} else {
|
||||
fmt.Println("audio", err)
|
||||
}
|
||||
case StartCodePS:
|
||||
break loop
|
||||
default:
|
||||
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
|
||||
case PrivateStreamCode:
|
||||
dec.ReadPayload()
|
||||
dec.PrintDump("</td></tr><tr><td>[ac3]")
|
||||
case MEPGProgramEndCode:
|
||||
dec.PrintDump("</td></tr>")
|
||||
return io.EOF
|
||||
default:
|
||||
fmt.Println("unknow code", code)
|
||||
return ErrParsePakcet
|
||||
}
|
||||
}
|
||||
if len(video) > 0 {
|
||||
pusher.PrintDump("</td>")
|
||||
pusher.PushVideo(dec.PTS, dec.DTS, video)
|
||||
video = nil
|
||||
}
|
||||
if nextStartCode == StartCodePS {
|
||||
// fmt.Println(aurora.Red("StartCodePS recursion..."), err)
|
||||
return dec.Read(ts, pusher)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
func (dec *DecPSPackage) decSystemHeader() error {
|
||||
syslens, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// drop rate video audio bound and lock flag
|
||||
syslens -= 6
|
||||
if err = dec.Skip(6); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ONE WAY: do not to parse the stream and skip the buffer
|
||||
//br.Skip(syslen * 8)
|
||||
|
||||
// TWO WAY: parse every stream info
|
||||
for syslens > 0 {
|
||||
if nextbits, err := dec.Uint8(); err != nil {
|
||||
return err
|
||||
} else if (nextbits&0x80)>>7 != 1 {
|
||||
break
|
||||
}
|
||||
if err = dec.Skip(2); err != nil {
|
||||
return err
|
||||
}
|
||||
syslens -= 3
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (dec *DecPSPackage) decSystemHeader() error {
|
||||
syslens, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// drop rate video audio bound and lock flag
|
||||
syslens -= 6
|
||||
if err = dec.Skip(6); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ONE WAY: do not to parse the stream and skip the buffer
|
||||
//br.Skip(syslen * 8)
|
||||
|
||||
// TWO WAY: parse every stream info
|
||||
for syslens > 0 {
|
||||
if nextbits, err := dec.Uint8(); err != nil {
|
||||
return err
|
||||
} else if (nextbits&0x80)>>7 != 1 {
|
||||
break
|
||||
}
|
||||
if err = dec.Skip(2); err != nil {
|
||||
return err
|
||||
}
|
||||
syslens -= 3
|
||||
}
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
psm, err := dec.ReadPayload()
|
||||
@@ -287,9 +312,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
|
||||
func (dec *DecPSPackage) decPESPacket() error {
|
||||
payload, err := dec.ReadPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(payload) < 4 {
|
||||
return errors.New("not enough data")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user