Compare commits

...

10 Commits

Author SHA1 Message Date
dexter
2b82a0ffc4 📦 NEW: 设置丢帧标志 2022-11-22 11:18:21 +08:00
dexter
940d7c5e59 🐛 FIX: 优化PS解析 2022-11-17 23:47:20 +08:00
dexter
2142a474a3 🐛 FIX: 大PES分包机制 2022-11-17 11:53:36 +08:00
dexter
86e9bccb85 🐛 FIX: 增加对AAC的支持以及多slice合并发送 2022-11-13 23:30:45 +08:00
dexter
bb3a679a60 音频时间戳转换 2022-10-25 21:38:06 +08:00
dexter
ecd97c8439 🐛 FIX: tcp解析 2022-10-25 15:35:00 +08:00
dexter
bfd71a72d8 Merge pull request #74 from hongri8488/v4
添加28181 bye接口调用时将流关闭
2022-10-25 14:44:39 +08:00
dexter
c6bef8ccd8 🐛 FIX: udp范围端口增加超时功能 2022-10-25 14:44:03 +08:00
hongri8488
20c0ac52cb 添加28181 bye接口调用时将流关闭 2022-10-25 13:51:31 +08:00
dexter
34f5b7da79 🐛 FIX: 推测h264的时候取的下标错误 2022-10-24 10:54:03 +08:00
4 changed files with 91 additions and 62 deletions

View File

@@ -22,7 +22,7 @@ gb28181:
prefetchrecord: false
udpcachesize: 0
sipnetwork: udp
sipip:
sipip: ""
sipport: 5060
serial: "34020000002000000001"
realm: "3402000000"
@@ -35,14 +35,15 @@ gb28181:
heartbeatinterval: 60
heartbeatretry: 3
mediaip:
mediaip: ""
mediaport: 58200
mediaidletimeout: 30
medianetwork: udp
mediaportmin: 0
meidaportmax: 0
removebaninterval: 600
loglevel: info
audioenable: true
```
- `AutoInvite` bool 表示自动发起invite当ServerSIP接收到设备信息时立即向设备发送invite命令获取流
@@ -68,12 +69,15 @@ gb28181:
- `MediaPort` uint16 媒体服务器端口
- `MediaNetwork` string 媒体传输协议默认UDP可选TCP
- `MediaIdleTimeout` uint16 推流超时时间,超过则断开链接,让设备重连
- `MediaPortMin` uint16 媒体服务器端口范围最小值
- `MediaPortMax` uint16 媒体服务器端口范围最大值
- `AudioEnable` bool 是否开启音频
- `LogLevel` string 日志级别,默认 infotracedebuginfowarnerrorfatal, panic
- `RemoveBanInterval` int 定时移除注册失败的设备黑名单单位秒默认10分钟600秒
- `UdpCacheSize` int 表示UDP缓存大小默认为0不开启。仅当TCP关闭切缓存大于0时才开启会最多缓存最多N个包并排序修复乱序造成的无法播放问题注意开启后会有一定的性能损耗并丢失部分包。
**如果配置了端口范围,将采用范围端口机制,每一个流对应一个端口
**注意某些摄像机没有设置用户名的地方摄像机会以自身的国标id作为用户名这个时候m7s会忽略使用摄像机的用户名忽略配置的用户名**
如果设备配置了错误的用户名和密码连续三次上报错误后m7s会记录设备id并在10分钟内禁止设备注册

View File

@@ -10,6 +10,7 @@ import (
"sync"
"time"
. "m7s.live/engine/v4"
"github.com/ghettovoice/gosip/sip"
"go.uber.org/zap"
"m7s.live/plugin/gb28181/v4/utils"
@@ -393,6 +394,11 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
}
func (channel *Channel) Bye(live bool) int {
d := channel.device
streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
if s := Streams.Get(streamPath); s != nil {
s.Close()
}
if live && channel.LivePublisher != nil {
return channel.LivePublisher.Bye()
}

View File

@@ -12,6 +12,7 @@ import (
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/codec/mpegts"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
@@ -109,18 +110,24 @@ func (p *GBPublisher) Bye() int {
func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
if p.VideoTrack == nil {
switch p.parser.VideoStreamType {
case utils.StreamTypeH264:
case mpegts.STREAM_TYPE_H264:
p.VideoTrack = NewH264(p.Publisher.Stream)
case utils.StreamTypeH265:
case mpegts.STREAM_TYPE_H265:
p.VideoTrack = NewH265(p.Publisher.Stream)
default:
//推测编码类型
var maybe264 H264NALUType
maybe264 = maybe264.Parse(payload[5])
maybe264 = maybe264.Parse(payload[4])
switch maybe264 {
case NALU_Non_IDR_Picture, NALU_IDR_Picture, NALU_SEI, NALU_SPS, NALU_PPS:
case NALU_Non_IDR_Picture,
NALU_IDR_Picture,
NALU_SEI,
NALU_SPS,
NALU_PPS,
NALU_Access_Unit_Delimiter:
p.VideoTrack = NewH264(p.Publisher.Stream)
default:
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
p.VideoTrack = NewH265(p.Publisher.Stream)
}
}
@@ -138,44 +145,51 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
if p.AudioTrack == nil {
switch p.parser.AudioStreamType {
case utils.G711A:
case mpegts.STREAM_TYPE_G711A:
at := NewG711(p.Publisher.Stream, true)
at.Audio.SampleRate = 8000
at.Audio.SampleSize = 16
at.Channels = 1
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
p.AudioTrack = at
case utils.G711A + 1:
case mpegts.STREAM_TYPE_G711U:
at := NewG711(p.Publisher.Stream, false)
at.Audio.SampleRate = 8000
at.Audio.SampleSize = 16
at.Channels = 1
at.AVCCHead = []byte{(byte(at.CodecID) << 4) | (1 << 1)}
p.AudioTrack = at
case mpegts.STREAM_TYPE_AAC:
p.AudioTrack = NewAAC(p.Publisher.Stream)
p.WriteADTS(payload[:7])
default:
p.Error("audio type not supported yet", zap.Uint32("type", p.parser.AudioStreamType))
return
}
} else {
p.AudioTrack.WriteRaw(ts, payload)
}
p.AudioTrack.WriteAVCC(ts, payload)
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
if !conf.IsMediaNetworkTCP() {
rtp = p.reorder.Push(rtp.SequenceNumber, rtp)
} else {
p.lastSeq = rtp.SequenceNumber
}
if p.parser == nil {
p.parser = utils.NewDecPSPackage(p)
}
for rtp != nil {
if rtp.SequenceNumber != p.lastSeq+1 {
p.parser.Drop()
}
p.parser.Feed(rtp.Payload)
if conf.IsMediaNetworkTCP() {
p.parser.Feed(rtp)
p.lastSeq = rtp.SequenceNumber
rtp = p.reorder.Pop()
} else {
for rtp = p.reorder.Push(rtp.SequenceNumber, rtp); rtp != nil; rtp = p.reorder.Pop() {
if rtp.SequenceNumber != p.lastSeq+1 {
p.parser.Drop()
if p.VideoTrack != nil {
p.VideoTrack.SetLostFlag()
}
}
p.parser.Feed(rtp)
p.lastSeq = rtp.SequenceNumber
}
}
}
@@ -222,16 +236,19 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
mediaAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.ListenUDP("udp", mediaAddr)
if err != nil {
conf.udpPorts.Recycle(port)
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
defer conn.Close()
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
@@ -249,6 +266,7 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
p.dumpFile.Write(ps)
}
p.PushPS(&rtpPacket)
conn.SetReadDeadline(time.Now().Add(time.Second * 10))
}
}()
return
@@ -263,6 +281,7 @@ func (p *GBPublisher) ListenTCP() (port uint16, err error) {
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
defer conf.tcpPorts.Recycle(port)
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}

View File

@@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"io"
"github.com/pion/rtp/v2"
)
const (
@@ -13,16 +15,6 @@ const (
TCPTransferPassive int = 2
LocalCache int = 3
StreamTypeH264 = 0x1b
StreamTypeH265 = 0x24
G711A = 0x90 //PCMA
G7221AUDIOTYPE = 0x92
G7231AUDIOTYPE = 0x93
G729AUDIOTYPE = 0x99
StreamIDVideo = 0xe0
StreamIDAudio = 0xc0
StartCodePS = 0x000001ba
StartCodeSYS = 0x000001bb
StartCodeMAP = 0x000001bc
@@ -112,9 +104,9 @@ type Pusher interface {
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
*/
type DecPSPackage struct {
systemClockReferenceBase uint64
systemClockReferenceExtension uint64
programMuxRate uint32
// systemClockReferenceBase uint64
// systemClockReferenceExtension uint64
// programMuxRate uint32
VideoStreamType uint32
AudioStreamType uint32
@@ -133,21 +125,25 @@ func NewDecPSPackage(p Pusher) *DecPSPackage {
Pusher: p,
}
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
dec.programMuxRate = 0
dec.Payload = nil
dec.PTS = 0
dec.DTS = 0
}
// func (dec *DecPSPackage) clean() {
// dec.systemClockReferenceBase = 0
// dec.systemClockReferenceExtension = 0
// dec.programMuxRate = 0
// dec.Payload = nil
// dec.PTS = 0
// dec.DTS = 0
// }
func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
payloadlen, err := dec.Uint16()
if err != nil {
return
}
return dec.ReadN(int(payloadlen))
if l := int(payloadlen); dec.Len() >= l {
return dec.Next(l), nil
}
return dec.Next(dec.Len()), io.EOF
}
// Drop 由于丢包引起的必须丢弃的数据
@@ -158,14 +154,21 @@ func (dec *DecPSPackage) Drop() {
dec.Payload = nil
}
func (dec *DecPSPackage) Feed(ps []byte) (err error) {
if ps[0] == 0 && ps[1] == 0 && ps[2] == 1 {
func (dec *DecPSPackage) Feed(rtp *rtp.Packet) (err error) {
ps := rtp.Payload
if len(ps) < 4 {
return nil
}
switch binary.BigEndian.Uint32(ps) {
case StartCodePS, StartCodeSYS, StartCodeMAP, StartCodeVideo, StartCodeAudio, PrivateStreamCode, MEPGProgramEndCode:
defer dec.Write(ps)
if dec.Len() >= 4 {
//说明需要处理PS包处理完后清空缓存
defer dec.Reset()
} else {
return
}
} else {
default:
// 说明是中间数据,直接写入缓存,否则数据不合法需要丢弃
if dec.Len() > 0 {
dec.Write(ps)
@@ -193,6 +196,10 @@ func (dec *DecPSPackage) Feed(ps []byte) (err error) {
if err = dec.Skip(int(psl)); err != nil {
return err
}
if len(dec.videoBuffer) > 0 {
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
dec.videoBuffer = nil
}
case StartCodeSYS:
dec.PrintDump("</td><td>[sys]")
dec.ReadPayload()
@@ -203,17 +210,12 @@ func (dec *DecPSPackage) Feed(ps []byte) (err error) {
if dec.videoBuffer == nil {
dec.PrintDump("</td><td>")
}
if err = dec.decPESPacket(); err == nil {
if dec.Payload[0] == 0 && dec.Payload[1] == 0 && dec.Payload[2] == 0 && dec.Payload[3] == 1 {
if len(dec.videoBuffer) > 0 {
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
dec.videoBuffer = nil
}
}
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
} else {
fmt.Println("video", err)
}
err = dec.decPESPacket()
// if err != nil {
//说明还有后续数据,需要继续处理
// println(rtp.SequenceNumber)
// }
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
dec.PrintDump("[video]")
case StartCodeAudio:
if dec.audioBuffer == nil {
@@ -307,9 +309,7 @@ func (dec *DecPSPackage) decProgramStreamMap() error {
func (dec *DecPSPackage) decPESPacket() error {
payload, err := dec.ReadPayload()
if err != nil {
return err
}
if len(payload) < 4 {
return errors.New("not enough data")
}