Compare commits

..

14 Commits

Author SHA1 Message Date
dexter
0d3a795dc2 👌 IMPROVE: 细粒度拼凑视频帧 2022-10-23 11:39:04 +08:00
dexter
b52d457990 🐛 FIX: 解决udp丢包引起的ps包组包错乱问题 2022-10-22 09:22:46 +08:00
dexter
4a214cebeb 🐛 FIX: PS解析 2022-10-21 16:16:44 +08:00
dexter
8f78f992ca Merge pull request #73 from hongri8488/v4
修改tcp请求部分设备不能拉流的问题
2022-10-21 11:52:36 +08:00
zhangdongfang
228d7b0cd2 修改tcp请求部分设备不能拉流的问题 2022-10-21 10:49:31 +08:00
dexter
e99150b0be Merge pull request #72 from hongri8488/v4
添加tcp多端口端口回收
2022-10-20 15:07:21 +08:00
zhangdongfang
31112e0052 添加tcp多端口端口回收 2022-10-20 15:03:18 +08:00
dexter
8663b8e171 Merge pull request #71 from hongri8488/v4
添加gb28181 tcp多端口
2022-10-20 14:55:32 +08:00
zhangdongfang
5960f07fc3 添加gb28181 tcp多端口 2022-10-20 14:49:31 +08:00
dexter
eb6004d6ef 🐛 FIX: 防止dts自动生成 2022-10-17 11:37:30 +08:00
dexter
cce5f67ab9 🐛 FIX: initRoutes索引越界 2022-10-13 16:23:32 +08:00
dexter
fdfb462d46 Merge pull request #70 from WXC9102/v4
更新通道panic
2022-09-26 18:52:23 +08:00
weixuechao
c05adce562 1.更新通道时会偶发panic, 2.新增/删除通道信息没有更新 2022-09-26 16:54:55 +08:00
dexter
aa3727f582 🐛 FIX: unlock时的判空 2022-09-19 00:21:48 +08:00
6 changed files with 242 additions and 167 deletions

View File

@@ -51,6 +51,28 @@ type Channel struct {
*ChannelEx //自定义属性
}
func (c *Channel) Copy(v *Channel) {
if v == nil {
return
}
c.DeviceID = v.DeviceID
c.ParentID = v.ParentID
c.Name = v.Name
c.Manufacturer = v.Manufacturer
c.Model = v.Model
c.Owner = v.Owner
c.CivilCode = v.CivilCode
c.Address = v.Address
c.Parental = v.Parental
c.SafetyWay = v.SafetyWay
c.RegisterWay = v.RegisterWay
c.Secrecy = v.Secrecy
c.Status = v.Status
c.Status = v.Status
c.ChannelEx = v.ChannelEx
}
func (c *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := c.device
d.sn++
@@ -279,10 +301,10 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
if conf.IsMediaNetworkTCP() {
protocol = "TCP/"
if conf.tcpPorts.Valid {
// opt.MediaPort, err = publisher.ListenTCP()
// if err != nil {
// return 500, err
// }
opt.MediaPort, err = publisher.ListenTCP()
if err != nil {
return 500, err
}
} else if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
@@ -320,9 +342,9 @@ func (channel *Channel) Invite(opt InviteOptions) (code int, err error) {
"y=" + opt.ssrc,
"",
}
// if config.IsMediaNetworkTCP() {
// sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
// }
if conf.IsMediaNetworkTCP() {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
}
invite := channel.CreateRequst(sip.INVITE)
contentType := sip.ContentType("application/sdp")
invite.AppendHeader(&contentType)

View File

@@ -233,10 +233,13 @@ func (d *Device) UpdateChannels(list []*Channel) {
go c.QueryRecord(n.Format(TIME_LAYOUT), n.Add(time.Hour*24-time.Second).Format(TIME_LAYOUT))
}
}
old.Copy(c)
c = old
} else {
c.ChannelEx = &ChannelEx{
device: d,
}
d.channelMap[c.DeviceID] = c
}
if conf.AutoInvite && (c.LivePublisher == nil) {
go c.Invite(InviteOptions{})
@@ -246,7 +249,6 @@ func (d *Device) UpdateChannels(list []*Channel) {
} else {
c.LiveSubSP = ""
}
d.channelMap[c.DeviceID] = c
}
}
func (d *Device) UpdateRecord(channelId string, list []*Record) {
@@ -456,7 +458,7 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
d.channelOffline(v.DeviceID)
case "ADD":
plugin.Debug("收到通道新增通知")
channel := Channel{
channel := &Channel{
DeviceID: v.DeviceID,
ParentID: v.ParentID,
Name: v.Name,
@@ -471,11 +473,12 @@ func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
Secrecy: v.Secrecy,
Status: v.Status,
}
d.addChannel(&channel)
channels := []*Channel{channel}
d.UpdateChannels(channels)
case "DEL":
//删除
plugin.Debug("收到通道删除通知")
delete(d.channelMap, v.DeviceID)
d.channelOffline(v.DeviceID)
case "UPDATE":
plugin.Debug("收到通道更新通知")
// 更新通道

View File

@@ -52,7 +52,9 @@ func (c *GB28181Config) initRoutes() {
tempIps := myip.LocalAndInternalIPs()
for k, v := range tempIps {
c.routes[k] = v
c.routes[k[0:strings.LastIndex(k, ".")]] = k
if lastdot := strings.LastIndex(k, "."); lastdot >= 0 {
c.routes[k[0:lastdot]] = k
}
}
plugin.Info(fmt.Sprintf("LocalAndInternalIPs detail: %s", c.routes))
}

View File

@@ -1,7 +1,6 @@
package gb28181
import (
"encoding/binary"
"fmt"
"io"
"net"
@@ -12,6 +11,7 @@ import (
"github.com/pion/rtp/v2"
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/gb28181/v4/utils"
@@ -28,6 +28,7 @@ type GBPublisher struct {
dumpFile *os.File
dumpPrint io.Writer
lastReceive time.Time
reorder util.RTPReorder[*rtp.Packet]
}
func (p *GBPublisher) PrintDump(s string) {
@@ -59,14 +60,18 @@ func (p *GBPublisher) OnEvent(event any) {
case SEwaitPublish:
//掉线自动重新拉流
if p.IsLive() {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
go p.channel.Invite(InviteOptions{})
}
case SEclose, SEKick:
if p.IsLive() {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
if p.channel.LivePublisher != nil {
p.channel.LivePublisher = nil
p.channel.liveInviteLock.Unlock()
}
} else {
p.channel.RecordPublisher = nil
}
@@ -109,10 +114,25 @@ func (p *GBPublisher) PushVideo(pts uint32, dts uint32, payload []byte) {
case utils.StreamTypeH265:
p.VideoTrack = NewH265(p.Publisher.Stream)
default:
return
//推测编码类型
var maybe264 H264NALUType
maybe264 = maybe264.Parse(payload[5])
switch maybe264 {
case NALU_Non_IDR_Picture, NALU_IDR_Picture, NALU_SEI, NALU_SPS, NALU_PPS:
p.VideoTrack = NewH264(p.Publisher.Stream)
default:
p.VideoTrack = NewH265(p.Publisher.Stream)
}
}
}
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
if len(payload) > 10 {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload[:10]))
} else {
p.PrintDump(fmt.Sprintf("<td>pts:%d dts:%d data: % 2X</td>", pts, dts, payload))
}
if dts == 0 {
dts = pts
}
p.VideoTrack.WriteAnnexB(pts, dts, payload)
}
func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
@@ -139,52 +159,26 @@ func (p *GBPublisher) PushAudio(ts uint32, payload []byte) {
p.AudioTrack.WriteAVCC(ts, payload)
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *GBPublisher) PushPS(rtp *rtp.Packet) {
originRtp := *rtp
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
//序号小于第一个包的丢弃,rtp包序号达到65535后会从0开始所以这里需要判断一下
if rtp.SequenceNumber < p.lastSeq && p.lastSeq-rtp.SequenceNumber < utils.MaxRtpDiff {
return
}
p.udpCache.Push(*rtp)
rtpTmp, _ := p.udpCache.Pop()
rtp = &rtpTmp
if !conf.IsMediaNetworkTCP() {
rtp = p.reorder.Push(rtp.SequenceNumber, rtp)
} else {
p.lastSeq = rtp.SequenceNumber
}
ps := rtp.Payload
if p.lastSeq != 0 {
// rtp序号不连续丢弃PS
if p.lastSeq+1 != rtp.SequenceNumber {
if conf.UdpCacheSize > 0 && !conf.IsMediaNetworkTCP() {
if p.udpCache.Len() < conf.UdpCacheSize {
p.udpCache.Push(*rtp)
return
} else {
p.udpCache.Empty()
rtp = &originRtp // 还原rtp包而不是使用缓存中避免rtp序号断裂
}
}
p.parser.Reset()
}
}
p.lastSeq = rtp.SequenceNumber
if p.parser == nil {
p.parser = new(utils.DecPSPackage)
p.parser = utils.NewDecPSPackage(p)
}
if len(ps) >= 4 && binary.BigEndian.Uint32(ps) == utils.StartCodePS {
if p.parser.Len() > 0 {
p.parser.Skip(4)
p.PrintDump("</td></tr>")
p.PrintDump("<tr>")
p.parser.Read(rtp.Timestamp, p)
p.PrintDump("</tr>")
p.PrintDump("<tr class=gray><td colspan=12>")
p.parser.Reset()
for rtp != nil {
if rtp.SequenceNumber != p.lastSeq+1 {
p.parser.Drop()
}
p.parser.Write(ps)
} else if p.parser.Len() > 0 {
p.parser.Write(ps)
p.parser.Feed(rtp.Payload)
p.lastSeq = rtp.SequenceNumber
rtp = p.reorder.Pop()
}
}
func (p *GBPublisher) Replay(f *os.File) (err error) {
var rtpPacket rtp.Packet
defer f.Close()
@@ -198,7 +192,6 @@ func (p *GBPublisher) Replay(f *os.File) (err error) {
p.PrintDump("<table>")
defer p.PrintDump("</table>")
}
p.PrintDump("<tr class=gray><td colspan=12>")
var t uint16
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
_, err = f.Read(l)
@@ -232,10 +225,12 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
plugin.Error("listen media server udp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
p.SetIO(conn)
go func() {
bufUDP := make([]byte, networkBuffer)
plugin.Info("Media udp server start.", zap.Uint16("port", port))
defer plugin.Info("Media udp server stop", zap.Uint16("port", port))
defer conf.udpPorts.Recycle(port)
dumpLen := make([]byte, 6)
for n, _, err := conn.ReadFromUDP(bufUDP); err == nil; n, _, err = conn.ReadFromUDP(bufUDP) {
ps := bufUDP[:n]
@@ -258,3 +253,31 @@ func (p *GBPublisher) ListenUDP() (port uint16, err error) {
}()
return
}
func (p *GBPublisher) ListenTCP() (port uint16, err error) {
port, err = conf.tcpPorts.GetPort()
if err != nil {
return
}
addr := fmt.Sprintf(":%d", port)
mediaAddr, _ := net.ResolveTCPAddr("tcp", addr)
listen, err := net.ListenTCP("tcp", mediaAddr)
if err != nil {
plugin.Error("listen media server tcp err", zap.String("addr", addr), zap.Error(err))
return 0, err
}
go func() {
plugin.Info("Media tcp server start.", zap.Uint16("port", port))
defer conf.tcpPorts.Recycle(port)
defer plugin.Info("Media tcp server stop", zap.Uint16("port", port))
conn, err := listen.Accept()
listen.Close()
p.SetIO(conn)
if err != nil {
plugin.Error("Accept err=", zap.Error(err))
return
}
processTcpMediaConn(conf, conn)
}()
return
}

View File

@@ -60,6 +60,7 @@ func (b *IOBuffer) ReadByte() (byte, error) {
b.off++
return c, nil
}
func (b *IOBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
@@ -81,20 +82,24 @@ func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
var ErrTooLarge = errors.New("IOBuffer: too large")
func (b *IOBuffer) Write(p []byte) (n int, err error) {
defer func() {
if recover() != nil {
panic(ErrTooLarge)
}
}()
l := len(p)
oldLen := len(b.buf)
m, ok := b.tryGrowByReslice(l)
if !ok {
buf := make([]byte, oldLen+l)
copy(buf, b.buf[b.off:])
m = oldLen - b.off
b.off = 0
b.buf = buf
}
return copy(b.buf[m:], p), nil
l := copy(b.buf, b.buf[b.off:])
b.buf = append(b.buf[:l], p...)
b.off = 0
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
return len(p), nil
// defer func() {
// if recover() != nil {
// panic(ErrTooLarge)
// }
// }()
// l := len(p)
// oldLen := len(b.buf)
// m, ok := b.tryGrowByReslice(l)
// if !ok {
// m = oldLen - b.off
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
// b.off = 0
// b.buf = buf
// }
// return copy(b.buf[m:], p), nil
}

View File

@@ -4,9 +4,9 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
)
//
const (
UDPTransfer int = 0
TCPTransferActive int = 1
@@ -28,7 +28,7 @@ const (
StartCodeMAP = 0x000001bc
StartCodeVideo = 0x000001e0
StartCodeAudio = 0x000001c0
HaiKangCode = 0x000001bd
PrivateStreamCode = 0x000001bd
MEPGProgramEndCode = 0x000001b9
RTPHeaderLength int = 12
@@ -119,11 +119,20 @@ type DecPSPackage struct {
VideoStreamType uint32
AudioStreamType uint32
IOBuffer
Payload []byte
PTS uint32
DTS uint32
Payload []byte
videoBuffer []byte
audioBuffer []byte
PTS uint32
DTS uint32
Pusher
}
func NewDecPSPackage(p Pusher) *DecPSPackage {
p.PrintDump("<tr><td>")
return &DecPSPackage{
Pusher: p,
}
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
@@ -141,113 +150,124 @@ func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
return dec.ReadN(int(payloadlen))
}
//read the buffer and push video or audio
func (dec *DecPSPackage) Read(ts uint32, pusher Pusher) error {
dec.clean()
dec.PTS = ts
pusher.PrintDump(fmt.Sprintf("<td>%d</td>", ts))
if err := dec.Skip(9); err != nil {
return err
}
// Drop 由于丢包引起的必须丢弃的数据
func (dec *DecPSPackage) Drop() {
dec.Reset()
dec.videoBuffer = nil
dec.audioBuffer = nil
dec.Payload = nil
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
var video []byte
var nextStartCode uint32
pusher.PrintDump("<td>")
loop:
for err == nil {
if nextStartCode, err = dec.Uint32(); err != nil {
break
func (dec *DecPSPackage) Feed(ps []byte) (err error) {
if ps[0] == 0 && ps[1] == 0 && ps[2] == 1 {
defer dec.Write(ps)
if dec.Len() >= 4 {
//说明需要处理PS包处理完后清空缓存
defer dec.Reset()
}
switch nextStartCode {
} else {
// 说明是中间数据,直接写入缓存,否则数据不合法需要丢弃
if dec.Len() > 0 {
dec.Write(ps)
}
return nil
}
for dec.Len() >= 4 {
code, _ := dec.Uint32()
// println("code:", code)
switch code {
case StartCodePS:
dec.PrintDump("</td></tr><tr><td>")
if len(dec.audioBuffer) > 0 {
dec.PushAudio(dec.PTS, dec.audioBuffer)
dec.audioBuffer = nil
}
if err := dec.Skip(9); err != nil {
return err
}
psl, err := dec.ReadByte()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
case StartCodeSYS:
pusher.PrintDump("[sys]")
dec.PrintDump("</td><td>[sys]")
dec.ReadPayload()
//err = dec.decSystemHeader()
case StartCodeMAP:
err = dec.decProgramStreamMap()
pusher.PrintDump("[map]")
dec.decProgramStreamMap()
dec.PrintDump("</td><td>[map]")
case StartCodeVideo:
if dec.videoBuffer == nil {
dec.PrintDump("</td><td>")
}
if err = dec.decPESPacket(); err == nil {
// if len(video) == 0 {
// if dec.PTS == 0 {
// dec.PTS = ts
// }
// // if dec.DTS == 0 {
// // dec.DTS = dec.PTS
// // }
// }
video = append(video, dec.Payload...)
if dec.Payload[0] == 0 && dec.Payload[1] == 0 && dec.Payload[2] == 0 && dec.Payload[3] == 1 {
if len(dec.videoBuffer) > 0 {
dec.PushVideo(dec.PTS, dec.DTS, dec.videoBuffer)
dec.videoBuffer = nil
}
}
dec.videoBuffer = append(dec.videoBuffer, dec.Payload...)
} else {
fmt.Println("video", err)
}
pusher.PrintDump("[video]")
dec.PrintDump("[video]")
case StartCodeAudio:
if dec.audioBuffer == nil {
dec.PrintDump("</td><td>")
}
if err = dec.decPESPacket(); err == nil {
ts := ts / 90
if dec.PTS != 0 {
ts = dec.PTS / 90
}
pusher.PushAudio(ts, dec.Payload)
pusher.PrintDump("[audio]")
dec.audioBuffer = append(dec.audioBuffer, dec.Payload...)
dec.PrintDump("[audio]")
} else {
fmt.Println("audio", err)
}
case StartCodePS:
break loop
default:
pusher.PrintDump(fmt.Sprintf("[%d]", nextStartCode))
case PrivateStreamCode:
dec.ReadPayload()
dec.PrintDump("</td></tr><tr><td>[ac3]")
case MEPGProgramEndCode:
dec.PrintDump("</td></tr>")
return io.EOF
default:
fmt.Println("unknow code", code)
return ErrParsePakcet
}
}
if len(video) > 0 {
pusher.PrintDump("</td>")
pusher.PushVideo(dec.PTS, dec.DTS, video)
video = nil
}
if nextStartCode == StartCodePS {
// fmt.Println(aurora.Red("StartCodePS recursion..."), err)
return dec.Read(ts, pusher)
}
return err
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
/*
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
*/
func (dec *DecPSPackage) decProgramStreamMap() error {
psm, err := dec.ReadPayload()