mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2025-12-24 13:27:57 +08:00
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b82a0ffc4 | ||
|
|
940d7c5e59 | ||
|
|
2142a474a3 | ||
|
|
86e9bccb85 | ||
|
|
bb3a679a60 | ||
|
|
ecd97c8439 | ||
|
|
bfd71a72d8 | ||
|
|
c6bef8ccd8 | ||
|
|
20c0ac52cb | ||
|
|
34f5b7da79 | ||
|
|
0d3a795dc2 | ||
|
|
b52d457990 | ||
|
|
4a214cebeb | ||
|
|
8f78f992ca | ||
|
|
228d7b0cd2 | ||
|
|
e99150b0be | ||
|
|
31112e0052 | ||
|
|
8663b8e171 | ||
|
|
5960f07fc3 |
14
README.md
14
README.md
@@ -22,7 +22,7 @@ gb28181:
|
||||
prefetchrecord: false
|
||||
udpcachesize: 0
|
||||
sipnetwork: udp
|
||||
sipip:
|
||||
sipip: ""
|
||||
sipport: 5060
|
||||
serial: "34020000002000000001"
|
||||
realm: "3402000000"
|
||||
@@ -35,14 +35,15 @@ gb28181:
|
||||
heartbeatinterval: 60
|
||||
heartbeatretry: 3
|
||||
|
||||
mediaip:
|
||||
mediaip: ""
|
||||
mediaport: 58200
|
||||
mediaidletimeout: 30
|
||||
medianetwork: udp
|
||||
|
||||
mediaportmin: 0
|
||||
meidaportmax: 0
|
||||
|
||||
removebaninterval: 600
|
||||
loglevel: info
|
||||
audioenable: true
|
||||
```
|
||||
|
||||
- `AutoInvite` bool 表示自动发起invite,当Server(SIP)接收到设备信息时,立即向设备发送invite命令获取流
|
||||
@@ -68,12 +69,15 @@ gb28181:
|
||||
- `MediaPort` uint16 媒体服务器端口
|
||||
- `MediaNetwork` string 媒体传输协议,默认UDP,可选TCP
|
||||
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
|
||||
|
||||
- `MediaPortMin` uint16 媒体服务器端口范围最小值
|
||||
- `MediaPortMax` uint16 媒体服务器端口范围最大值
|
||||
- `AudioEnable` bool 是否开启音频
|
||||
- `LogLevel` string 日志级别,默认 info(trace,debug,info,warn,error,fatal, panic)
|
||||
- `RemoveBanInterval` int 定时移除注册失败的设备黑名单,单位秒,默认10分钟(600秒)
|
||||
- `UdpCacheSize` int 表示UDP缓存大小,默认为0,不开启。仅当TCP关闭,切缓存大于0时才开启,会最多缓存最多N个包,并排序,修复乱序造成的无法播放问题,注意开启后,会有一定的性能损耗,并丢失部分包。
|
||||
|
||||
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
|
||||
|
||||
**注意某些摄像机没有设置用户名的地方,摄像机会以自身的国标id作为用户名,这个时候m7s会忽略使用摄像机的用户名,忽略配置的用户名**
|
||||
如果设备配置了错误的用户名和密码,连续三次上报错误后,m7s会记录设备id,并在10分钟内禁止设备注册
|
||||
|
||||
|
||||
20
channel.go
20
channel.go
@@ -10,6 +10,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "m7s.live/engine/v4"
|
||||
"github.com/ghettovoice/gosip/sip"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -301,10 +302,10 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
protocol = "TCP/"
|
||||
if conf.tcpPorts.Valid {
|
||||
// opt.MediaPort, err = publisher.ListenTCP()
|
||||
// if err != nil {
|
||||
// return 500, err
|
||||
// }
|
||||
opt.MediaPort, err = publisher.ListenTCP()
|
||||
if err != nil {
|
||||
return 500, err
|
||||
}
|
||||
} else if opt.MediaPort == 0 {
|
||||
opt.MediaPort = conf.MediaPort
|
||||
}
|
||||
@@ -342,9 +343,9 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
"y=" + opt.ssrc,
|
||||
"",
|
||||
}
|
||||
// if config.IsMediaNetworkTCP() {
|
||||
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
// }
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
|
||||
}
|
||||
invite := channel.CreateRequst(sip.INVITE)
|
||||
contentType := sip.ContentType("application/sdp")
|
||||
invite.AppendHeader(&contentType)
|
||||
@@ -393,6 +394,11 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
|
||||
}
|
||||
|
||||
func (channel *Channel) Bye(live bool) int {
|
||||
d := channel.device
|
||||
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
|
||||
if s := Streams.Get(streamPath); s != nil {
|
||||
s.Close()
|
||||
}
|
||||
if live && channel.LivePublisher != nil {
|
||||
return channel.LivePublisher.Bye()
|
||||
}
|
||||
|
||||
129
publisher.go
129
publisher.go
@@ -1,7 +1,6 @@
|
||||
package gb28181
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -12,6 +11,8 @@ import (
|
||||
"github.com/pion/rtp/v2"
|
||||
"go.uber.org/zap"
|
||||
. "m7s.live/engine/v4"
|
||||
. "m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/plugin/gb28181/v4/utils"
|
||||
@@ -28,6 +29,7 @@ type GBPublisher struct {
|
||||
dumpFile *os.File
|
||||
dumpPrint io.Writer
|
||||
lastReceive time.Time
|
||||
reorder util.RTPReorder[*rtp.Packet]
|
||||
}
|
||||
|
||||
func (p *GBPublisher) PrintDump(s string) {
|
||||
@@ -108,15 +110,33 @@ func (p *GBPublisher) Bye() int {
|
||||
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
if p.VideoTrack == nil {
|
||||
switch p.parser.VideoStreamType {
|
||||
case utils.StreamTypeH264:
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
case utils.StreamTypeH265:
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
default:
|
||||
return
|
||||
//推测编码类型
|
||||
var maybe264 H264NALUType
|
||||
maybe264 = maybe264.Parse(payload[4])
|
||||
switch maybe264 {
|
||||
case NALU_Non_IDR_Picture,
|
||||
NALU_IDR_Picture,
|
||||
NALU_SEI,
|
||||
NALU_SPS,
|
||||
NALU_PPS,
|
||||
NALU_Access_Unit_Delimiter:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream)
|
||||
default:
|
||||
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream)
|
||||
}
|
||||
}
|
||||
}
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
if len(payload) > 10 {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
|
||||
} else {
|
||||
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
|
||||
}
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
@@ -125,73 +145,54 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
|
||||
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
|
||||
if p.AudioTrack == nil {
|
||||
switch p.parser.AudioStreamType {
|
||||
case utils.G711A:
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
at := NewG711(p.Publisher.Stream, true)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case utils.G711A + 1:
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
at := NewG711(p.Publisher.Stream, false)
|
||||
at.Audio.SampleRate = 8000
|
||||
at.Audio.SampleSize = 16
|
||||
at.Channels = 1
|
||||
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
|
||||
p.AudioTrack = at
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(payload[:7])
|
||||
default:
|
||||
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
p.AudioTrack.WriteRaw(ts, payload)
|
||||
}
|
||||
p.AudioTrack.WriteAVCC(ts, payload)
|
||||
}
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
|
||||
originRtp := *rtp
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
//序号小于第一个包的丢弃,rtp包序号达到65535后会从0开始,所以这里需要判断一下
|
||||
if rtp.SequenceNumber < p.lastSeq && p.lastSeq-rtp.SequenceNumber < utils.MaxRtpDiff {
|
||||
return
|
||||
}
|
||||
p.udpCache.Push(*rtp)
|
||||
rtpTmp, _ := p.udpCache.Pop()
|
||||
rtp = &rtpTmp
|
||||
if p.parser == nil {
|
||||
p.parser = utils.NewDecPSPackage(p)
|
||||
}
|
||||
ps := rtp.Payload
|
||||
if p.lastSeq != 0 {
|
||||
// rtp序号不连续,丢弃PS
|
||||
if p.lastSeq+1 != rtp.SequenceNumber {
|
||||
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
|
||||
if p.udpCache.Len() < conf.UdpCacheSize {
|
||||
p.udpCache.Push(*rtp)
|
||||
return
|
||||
} else {
|
||||
p.udpCache.Empty()
|
||||
rtp = &originRtp // 还原rtp包,而不是使用缓存中,避免rtp序号断裂
|
||||
if conf.IsMediaNetworkTCP() {
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
|
||||
if rtp.SequenceNumber != p.lastSeq+1 {
|
||||
p.parser.Drop()
|
||||
if p.VideoTrack != nil {
|
||||
p.VideoTrack.SetLostFlag()
|
||||
}
|
||||
}
|
||||
p.parser.Reset()
|
||||
p.parser.Feed(rtp)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
}
|
||||
}
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
if p.parser == nil {
|
||||
p.parser = new(utils.DecPSPackage)
|
||||
}
|
||||
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
|
||||
if p.parser.Len() > 0 {
|
||||
p.parser.Skip(4)
|
||||
p.PrintDump("</td></tr>")
|
||||
p.PrintDump("<tr>")
|
||||
p.parser.Read(rtp.Timestamp, p)
|
||||
p.PrintDump("</tr>")
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
p.parser.Reset()
|
||||
}
|
||||
p.parser.Write(ps)
|
||||
} else if p.parser.Len() > 0 {
|
||||
p.parser.Write(ps)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
@@ -205,7 +206,6 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
|
||||
p.PrintDump("<table>")
|
||||
defer p.PrintDump("</table>")
|
||||
}
|
||||
p.PrintDump("<tr class=gray><td colspan=12>")
|
||||
var t uint16
|
||||
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = f.Read(l)
|
||||
@@ -236,16 +236,19 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
|
||||
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
|
||||
conn, err := net.ListenUDP("udp", mediaAddr)
|
||||
if err != nil {
|
||||
conf.udpPorts.Recycle(port)
|
||||
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
p.SetIO(conn)
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
bufUDP := make([]byte, networkBuffer)
|
||||
plugin.Info("Media udp server start.", zap.Uint16("port", port))
|
||||
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
|
||||
defer conf.udpPorts.Recycle(port)
|
||||
dumpLen := make([]byte, 6)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
|
||||
ps := bufUDP[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
@@ -263,7 +266,37 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
|
||||
p.dumpFile.Write(ps)
|
||||
}
|
||||
p.PushPS(&rtpPacket)
|
||||
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
|
||||
port, err = conf.tcpPorts.GetPort()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
|
||||
listen, err := net.ListenTCP("tcp", mediaAddr)
|
||||
if err != nil {
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
go func() {
|
||||
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
|
||||
defer conf.tcpPorts.Recycle(port)
|
||||
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
|
||||
conn, err := listen.Accept()
|
||||
listen.Close()
|
||||
p.SetIO(conn)
|
||||
if err != nil {
|
||||
plugin.Error("Accept err=", zap.Error(err))
|
||||
return
|
||||
}
|
||||
processTcpMediaConn(conf, conn)
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ func (b *IOBuffer) ReadByte() (byte, error) {
|
||||
b.off++
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Reset() {
|
||||
b.buf = b.buf[:0]
|
||||
b.off = 0
|
||||
@@ -81,20 +82,24 @@ func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
|
||||
var ErrTooLarge = errors.New("IOBuffer: too large")
|
||||
|
||||
func (b *IOBuffer) Write(p []byte) (n int, err error) {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
panic(ErrTooLarge)
|
||||
}
|
||||
}()
|
||||
l := len(p)
|
||||
oldLen := len(b.buf)
|
||||
m, ok := b.tryGrowByReslice(l)
|
||||
if !ok {
|
||||
buf := make([]byte, oldLen+l)
|
||||
copy(buf, b.buf[b.off:])
|
||||
m = oldLen - b.off
|
||||
b.off = 0
|
||||
b.buf = buf
|
||||
}
|
||||
return copy(b.buf[m:], p), nil
|
||||
l := copy(b.buf, b.buf[b.off:])
|
||||
b.buf = append(b.buf[:l], p...)
|
||||
b.off = 0
|
||||
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
|
||||
return len(p), nil
|
||||
// defer func() {
|
||||
// if recover() != nil {
|
||||
// panic(ErrTooLarge)
|
||||
// }
|
||||
// }()
|
||||
// l := len(p)
|
||||
// oldLen := len(b.buf)
|
||||
// m, ok := b.tryGrowByReslice(l)
|
||||
// if !ok {
|
||||
// m = oldLen - b.off
|
||||
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
|
||||
// b.off = 0
|
||||
// b.buf = buf
|
||||
// }
|
||||
// return copy(b.buf[m:], p), nil
|
||||
}
|
||||
|
||||
264
utils/ps.go
264
utils/ps.go
@@ -4,31 +4,23 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pion/rtp/v2"
|
||||
)
|
||||
|
||||
//
|
||||
const (
|
||||
UDPTransfer int = 0
|
||||
TCPTransferActive int = 1
|
||||
TCPTransferPassive int = 2
|
||||
LocalCache int = 3
|
||||
|
||||
StreamTypeH264 = 0x1b
|
||||
StreamTypeH265 = 0x24
|
||||
G711A = 0x90 //PCMA
|
||||
G7221AUDIOTYPE = 0x92
|
||||
G7231AUDIOTYPE = 0x93
|
||||
G729AUDIOTYPE = 0x99
|
||||
|
||||
StreamIDVideo = 0xe0
|
||||
StreamIDAudio = 0xc0
|
||||
|
||||
StartCodePS = 0x000001ba
|
||||
StartCodeSYS = 0x000001bb
|
||||
StartCodeMAP = 0x000001bc
|
||||
StartCodeVideo = 0x000001e0
|
||||
StartCodeAudio = 0x000001c0
|
||||
HaiKangCode = 0x000001bd
|
||||
PrivateStreamCode = 0x000001bd
|
||||
MEPGProgramEndCode = 0x000001b9
|
||||
|
||||
RTPHeaderLength int = 12
|
||||
@@ -112,142 +104,172 @@ type Pusher interface {
|
||||
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
|
||||
*/
|
||||
type DecPSPackage struct {
|
||||
systemClockReferenceBase uint64
|
||||
systemClockReferenceExtension uint64
|
||||
programMuxRate uint32
|
||||
// systemClockReferenceBase uint64
|
||||
// systemClockReferenceExtension uint64
|
||||
// programMuxRate uint32
|
||||
|
||||
VideoStreamType uint32
|
||||
AudioStreamType uint32
|
||||
IOBuffer
|
||||
Payload []byte
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
Payload []byte
|
||||
videoBuffer []byte
|
||||
audioBuffer []byte
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
Pusher
|
||||
}
|
||||
|
||||
func (dec *DecPSPackage) clean() {
|
||||
dec.systemClockReferenceBase = 0
|
||||
dec.systemClockReferenceExtension = 0
|
||||
dec.programMuxRate = 0
|
||||
dec.Payload = nil
|
||||
dec.PTS = 0
|
||||
dec.DTS = 0
|
||||
func NewDecPSPackage(p Pusher) *DecPSPackage {
|
||||
p.PrintDump("<tr><td>")
|
||||
return &DecPSPackage{
|
||||
Pusher: p,
|
||||
}
|
||||
}
|
||||
|
||||
// func (dec *DecPSPackage) clean() {
|
||||
// dec.systemClockReferenceBase = 0
|
||||
// dec.systemClockReferenceExtension = 0
|
||||
// dec.programMuxRate = 0
|
||||
// dec.Payload = nil
|
||||
// dec.PTS = 0
|
||||
// dec.DTS = 0
|
||||
// }
|
||||
|
||||
func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
|
||||
payloadlen, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return dec.ReadN(int(payloadlen))
|
||||
if l := int(payloadlen); dec.Len() >= l {
|
||||
return dec.Next(l), nil
|
||||
}
|
||||
return dec.Next(dec.Len()), io.EOF
|
||||
}
|
||||
|
||||
//read the buffer and push video or audio
|
||||
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
|
||||
dec.clean()
|
||||
dec.PTS = ts
|
||||
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
// Drop 由于丢包引起的必须丢弃的数据
|
||||
func (dec *DecPSPackage) Drop() {
|
||||
dec.Reset()
|
||||
dec.videoBuffer = nil
|
||||
dec.audioBuffer = nil
|
||||
dec.Payload = nil
|
||||
}
|
||||
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
|
||||
ps := rtp.Payload
|
||||
if len(ps) < 4 {
|
||||
return nil
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
}
|
||||
var video []byte
|
||||
var nextStartCode uint32
|
||||
pusher.PrintDump("<td>")
|
||||
loop:
|
||||
for err == nil {
|
||||
if nextStartCode, err = dec.Uint32(); err != nil {
|
||||
break
|
||||
switch binary.BigEndian.Uint32(ps) {
|
||||
case StartCodePS, StartCodeSYS, StartCodeMAP, StartCodeVideo, StartCodeAudio, PrivateStreamCode, MEPGProgramEndCode:
|
||||
defer dec.Write(ps)
|
||||
if dec.Len() >= 4 {
|
||||
//说明需要处理PS包,处理完后,清空缓存
|
||||
defer dec.Reset()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
switch nextStartCode {
|
||||
case StartCodeSYS:
|
||||
pusher.PrintDump("[sys]")
|
||||
dec.ReadPayload()
|
||||
//err = dec.decSystemHeader()
|
||||
case StartCodeMAP:
|
||||
err = dec.decProgramStreamMap()
|
||||
pusher.PrintDump("[map]")
|
||||
case StartCodeVideo:
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
// if len(video) == 0 {
|
||||
// if dec.PTS == 0 {
|
||||
// dec.PTS = ts
|
||||
// }
|
||||
// // if dec.DTS == 0 {
|
||||
// // dec.DTS = dec.PTS
|
||||
// // }
|
||||
// }
|
||||
video = append(video, dec.Payload...)
|
||||
} else {
|
||||
fmt.Println("video", err)
|
||||
default:
|
||||
// 说明是中间数据,直接写入缓存,否则数据不合法需要丢弃
|
||||
if dec.Len() > 0 {
|
||||
dec.Write(ps)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for dec.Len() >= 4 {
|
||||
code, _ := dec.Uint32()
|
||||
// println("code:", code)
|
||||
switch code {
|
||||
case StartCodePS:
|
||||
dec.PrintDump("</td></tr><tr><td>")
|
||||
if len(dec.audioBuffer) > 0 {
|
||||
dec.PushAudio(dec.PTS, dec.audioBuffer)
|
||||
dec.audioBuffer = nil
|
||||
}
|
||||
pusher.PrintDump("[video]")
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(dec.videoBuffer) > 0 {
|
||||
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
|
||||
dec.videoBuffer = nil
|
||||
}
|
||||
case StartCodeSYS:
|
||||
dec.PrintDump("</td><td>[sys]")
|
||||
dec.ReadPayload()
|
||||
case StartCodeMAP:
|
||||
dec.decProgramStreamMap()
|
||||
dec.PrintDump("</td><td>[map]")
|
||||
case StartCodeVideo:
|
||||
if dec.videoBuffer == nil {
|
||||
dec.PrintDump("</td><td>")
|
||||
}
|
||||
err = dec.decPESPacket()
|
||||
// if err != nil {
|
||||
//说明还有后续数据,需要继续处理
|
||||
// println(rtp.SequenceNumber)
|
||||
// }
|
||||
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
|
||||
dec.PrintDump("[video]")
|
||||
case StartCodeAudio:
|
||||
if dec.audioBuffer == nil {
|
||||
dec.PrintDump("</td><td>")
|
||||
}
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
ts := ts / 90
|
||||
if dec.PTS != 0 {
|
||||
ts = dec.PTS / 90
|
||||
}
|
||||
pusher.PushAudio(ts, dec.Payload)
|
||||
pusher.PrintDump("[audio]")
|
||||
dec.audioBuffer = append(dec.audioBuffer, dec.Payload...)
|
||||
dec.PrintDump("[audio]")
|
||||
} else {
|
||||
fmt.Println("audio", err)
|
||||
}
|
||||
case StartCodePS:
|
||||
break loop
|
||||
default:
|
||||
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
|
||||
case PrivateStreamCode:
|
||||
dec.ReadPayload()
|
||||
dec.PrintDump("</td></tr><tr><td>[ac3]")
|
||||
case MEPGProgramEndCode:
|
||||
dec.PrintDump("</td></tr>")
|
||||
return io.EOF
|
||||
default:
|
||||
fmt.Println("unknow code", code)
|
||||
return ErrParsePakcet
|
||||
}
|
||||
}
|
||||
if len(video) > 0 {
|
||||
pusher.PrintDump("</td>")
|
||||
pusher.PushVideo(dec.PTS, dec.DTS, video)
|
||||
video = nil
|
||||
}
|
||||
if nextStartCode == StartCodePS {
|
||||
// fmt.Println(aurora.Red("StartCodePS recursion..."), err)
|
||||
return dec.Read(ts, pusher)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
func (dec *DecPSPackage) decSystemHeader() error {
|
||||
syslens, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// drop rate video audio bound and lock flag
|
||||
syslens -= 6
|
||||
if err = dec.Skip(6); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ONE WAY: do not to parse the stream and skip the buffer
|
||||
//br.Skip(syslen * 8)
|
||||
|
||||
// TWO WAY: parse every stream info
|
||||
for syslens > 0 {
|
||||
if nextbits, err := dec.Uint8(); err != nil {
|
||||
return err
|
||||
} else if (nextbits&0x80)>>7 != 1 {
|
||||
break
|
||||
}
|
||||
if err = dec.Skip(2); err != nil {
|
||||
return err
|
||||
}
|
||||
syslens -= 3
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (dec *DecPSPackage) decSystemHeader() error {
|
||||
syslens, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// drop rate video audio bound and lock flag
|
||||
syslens -= 6
|
||||
if err = dec.Skip(6); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ONE WAY: do not to parse the stream and skip the buffer
|
||||
//br.Skip(syslen * 8)
|
||||
|
||||
// TWO WAY: parse every stream info
|
||||
for syslens > 0 {
|
||||
if nextbits, err := dec.Uint8(); err != nil {
|
||||
return err
|
||||
} else if (nextbits&0x80)>>7 != 1 {
|
||||
break
|
||||
}
|
||||
if err = dec.Skip(2); err != nil {
|
||||
return err
|
||||
}
|
||||
syslens -= 3
|
||||
}
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
psm, err := dec.ReadPayload()
|
||||
@@ -287,9 +309,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
|
||||
func (dec *DecPSPackage) decPESPacket() error {
|
||||
payload, err := dec.ReadPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(payload) < 4 {
|
||||
return errors.New("not enough data")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user