diff --git a/example/default/config.yaml b/example/default/config.yaml index 5d8af48..3c42e37 100755 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -12,20 +12,20 @@ srt: listenaddr: :6000 passphrase: foobarfoobar gb28181: - enable: false # 是否启用GB28181协议 + enable: false # 是否启用GB28181协议 autoinvite: false #建议使用false,开启后会自动邀请设备推流 - mediaip: 192.168.1.21 #流媒体收流IP,外网情况下使用公网IP,内网情况下使用网卡IP,不要用127.0.0.1 - sipip: 192.168.1.21 #SIP通讯IP,不管公网还是内网都使用本机网卡IP,不要用127.0.0.1 + mediaip: 192.168.1.21 #流媒体收流IP,外网情况下使用公网IP,内网情况下使用网卡IP,不要用127.0.0.1 + sipip: 192.168.1.21 #SIP通讯IP,不管公网还是内网都使用本机网卡IP,不要用127.0.0.1 sip: listenaddr: - - udp::5060 + - udp::5060 onsub: pull: ^\d{20}/\d{20}$: $0 ^gb_\d+/(.+)$: $1 -# .* : $0 + # .* : $0 platforms: - - enable: false #是否启用平台 + - enable: false #是否启用平台 name: "测试平台" #平台名称 servergbid: "34020000002000000002" #上级平台GBID servergbdomain: "3402000000" #上级平台GB域 @@ -89,13 +89,10 @@ hls: # onpub: # transform: # .* : 5s x 3 -#rtsp: -# pull: - # live/test: rtsp://admin:1qaz2wsx3EDC@giroro.tpddns.cn:1554/Streaming/Channels/101 -# live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test s3: - auto: true # 启用自动上传 + enable: false + auto: true # 启用自动上传 deleteAfterUpload: false # 上传后保留本地文件 endpoint: "storage-dev.xiding.tech" accessKeyId: "xidinguser" @@ -105,30 +102,38 @@ s3: forcePathStyle: true useSSL: true +rtsp: + # pull: + # live/test: rtsp://admin:1qaz2wsx3EDC@58.212.158.30/Streaming/Channels/101 + # live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test +webrtc: + publish: + pubaudio: false + port: udp:9000-9100 snap: enable: false onpub: transform: .+: output: - - watermark: - text: "abcd" # 水印文字内容 - fontpath: /Users/dexter/Library/Fonts/MapleMono-NF-CN-Medium.ttf # 水印字体文件路径 - fontcolor: "rgba(255,165,0,1)" # 水印字体颜色,支持rgba格式 - fontsize: 36 # 水印字体大小 - offsetx: 0 # 水印位置X偏移 - offsety: 0 # 水印位置Y偏移 - timeinterval: 1s # 截图时间间隔 - savepath: "snaps" # 截图保存路径 - iframeinterval: 3 # 间隔多少帧截图 - querytimedelta: 3 # 查询截图时允许的最大时间差(秒) + - watermark: + text: "abcd" # 水印文字内容 + fontpath: /Users/dexter/Library/Fonts/MapleMono-NF-CN-Medium.ttf # 水印字体文件路径 + fontcolor: "rgba(255,165,0,1)" # 水印字体颜色,支持rgba格式 + fontsize: 36 # 水印字体大小 + offsetx: 0 # 水印位置X偏移 + offsety: 0 # 水印位置Y偏移 + timeinterval: 1s # 截图时间间隔 + savepath: "snaps" # 截图保存路径 + iframeinterval: 3 # 间隔多少帧截图 + querytimedelta: 3 # 查询截图时允许的最大时间差(秒) onvif: enable: false discoverinterval: 3 # 发现设备的间隔,单位秒,默认30秒,建议比rtsp插件的重连间隔大点 autopull: true autoadd: true interfaces: # 设备发现指定网卡,以及该网卡对应IP段的全局默认账号密码,支持多网卡 - - interfacename: 以太网 # 网卡名称 或者"以太网" "eth0"等,使用ipconfig 或者 ifconfig 查看网卡名称 + - interfacename: 以太网 # 网卡名称 或者"以太网" "eth0"等,使用ipconfig 或者 ifconfig 查看网卡名称 username: admin # onvif 账号 password: admin # onvif 密码 # - interfacename: WLAN 2 # 网卡2 @@ -140,4 +145,4 @@ onvif: # password: '123' # - ip: 192.168.1.2 # username: admin - # password: '456' \ No newline at end of file + # password: '456' diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 6995b3c..e22881b 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -136,18 +136,34 @@ func (d *Dialog) Start() (err error) { d.pullCtx.GoToStepConst(StepSIPPrepare) //defer d.gb.dialogs.Remove(d) - if d.gb.tcpPort > 0 { - d.MediaPort = d.gb.tcpPort - } else { - if d.gb.MediaPort.Valid() { - select { - case d.MediaPort = <-d.gb.tcpPorts: - default: - d.pullCtx.Fail("no available tcp port") - return fmt.Errorf("no available tcp port") - } + if d.StreamMode == mrtp.StreamModeTCPPassive { + if d.gb.tcpPort > 0 { + d.MediaPort = d.gb.tcpPort } else { - d.MediaPort = d.gb.MediaPort[0] + if d.gb.MediaPort.Valid() { + select { + case d.MediaPort = <-d.gb.tcpPorts: + default: + d.pullCtx.Fail("no available tcp port") + return fmt.Errorf("no available tcp port") + } + } else { + d.MediaPort = d.gb.MediaPort[0] + } + } + } else if d.StreamMode == mrtp.StreamModeUDP { + if d.gb.udpPort > 0 { + d.MediaPort = d.gb.udpPort + } else { + if d.gb.MediaPort.Valid() { + select { + case d.MediaPort = <-d.gb.udpPorts: + default: + return fmt.Errorf("no available udp port") + } + } else { + d.MediaPort = d.gb.MediaPort[0] + } } } @@ -214,7 +230,9 @@ func (d *Dialog) Start() (err error) { "a=connection:new", ) case mrtp.StreamModeUDP: + /* 支持udp收流 yjx return errors.New("do not support udp mode") + */ default: sdpInfo = append(sdpInfo, "a=setup:passive", @@ -354,9 +372,10 @@ func (d *Dialog) Run() (err error) { var pub mrtp.PSReceiver pub.Publisher = d.pullCtx.Publisher - if d.StreamMode == mrtp.StreamModeTCPActive { + switch d.StreamMode { + case mrtp.StreamModeTCPActive: pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort) - } else { + case mrtp.StreamModeTCPPassive: if d.gb.tcpPort > 0 { d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort) if d.gb.netListener != nil { @@ -370,6 +389,18 @@ func (d *Dialog) Run() (err error) { pub.SSRC = d.SSRC } pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) + case mrtp.StreamModeUDP: + if d.gb.udpPort > 0 { + d.Info("into single port mode, use gb.udpPort", d.gb.udpPort) + if d.gb.netUDPListener != nil { + d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr()) + } else { + d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort)) + d.gb.netUDPListener, _ = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4) + } + } + pub.SSRC = d.SSRC + pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) } pub.StreamMode = d.StreamMode return d.RunTask(&pub) @@ -380,9 +411,16 @@ func (d *Dialog) GetKey() string { } func (d *Dialog) Dispose() { - if d.gb.tcpPort == 0 { - // 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用 - d.gb.tcpPorts <- d.MediaPort + if d.StreamMode == mrtp.StreamModeUDP { + if d.gb.udpPort == 0 { //多端口 + // 如果没有设置udp端口,则将MediaPort设置为0,表示不再使用 + d.gb.udpPorts <- d.MediaPort + } + } else { + if d.gb.tcpPort == 0 { + // 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用 + d.gb.tcpPorts <- d.MediaPort + } } d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId) if d.session != nil { diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 6821d65..23588b6 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -17,6 +17,7 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/pion/rtp" "github.com/rs/zerolog" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" @@ -65,6 +66,9 @@ type GB28181Plugin struct { Platforms []*gb28181.PlatformModel channels util.Collection[string, *Channel] netListener net.Listener + udpPorts chan uint16 + udpPort uint16 + netUDPListener *net.UDPConn } var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ @@ -89,6 +93,14 @@ func (gb *GB28181Plugin) Dispose() { gb.Info("netListener closed") } } + if gb.netUDPListener != nil { + err := gb.netUDPListener.Close() + if err != nil { + gb.Error("Close netUDPListener error", "error", err) + } else { + gb.Info("netUDPListener closed") + } + } } func init() { @@ -173,15 +185,40 @@ func (gb *GB28181Plugin) Start() (err error) { if gb.MediaPort.Valid() { gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) + gb.udpPorts = make(chan uint16, gb.MediaPort.Size()) if gb.MediaPort.Size() == 0 { gb.tcpPort = gb.MediaPort[0] gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) + //support udp + { + gb.udpPort = gb.MediaPort[0] + gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) + + if err != nil { + gb.Error("start listen", "err", err) + return errors.New("start udp listen, err" + err.Error()) + } + go gb.ReadUdpInsinglePort() + } } else if gb.MediaPort.Size() == 1 { gb.tcpPort = gb.MediaPort[0] + 1 gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) + //support udp + { + gb.udpPort = gb.MediaPort[0] + 1 + gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) + + if err != nil { + gb.Error("start listen", "err", err) + return errors.New("start udp listen, err" + err.Error()) + } + + go gb.ReadUdpInsinglePort() + } } else { for i := range gb.MediaPort.Size() { gb.tcpPorts <- gb.MediaPort[0] + i + gb.udpPorts <- gb.MediaPort[0] + i } } } else { @@ -1009,3 +1046,19 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) { return } } + +func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) { + buffer := make(util.Buffer, 1024*1024) + var rtpPacket rtp.Packet + for { + n, _, err := gb.netUDPListener.ReadFromUDP(buffer) + if err != nil { + return err + } + + ps := buffer[:n] + if err := rtpPacket.Unmarshal(ps); err != nil { + continue + } + } +} diff --git a/plugin/rtp/pkg/reader.go b/plugin/rtp/pkg/reader.go index 7e92d45..f583d60 100644 --- a/plugin/rtp/pkg/reader.go +++ b/plugin/rtp/pkg/reader.go @@ -15,19 +15,33 @@ type IRTPReader interface { type RTPUDPReader struct { io.Reader - buf [MTUSize]byte + RTPReorder[*rtp.Packet] + UdpCacheSize int } func NewRTPUDPReader(r io.Reader) *RTPUDPReader { return &RTPUDPReader{Reader: r} } -func (r *RTPUDPReader) Read(packet *rtp.Packet) (err error) { - n, err := r.Reader.Read(r.buf[:]) - if err != nil { - return err +func (r *RTPUDPReader) Read(packet *rtp.Packet) error { + for { + ordered := r.Pop() + if ordered != nil { + *packet = *ordered + return nil + } + var buf [MTUSize]byte + var pack rtp.Packet + n, err := r.Reader.Read(buf[:]) + if err != nil { + return err + } + err = pack.Unmarshal(buf[:n]) + if err != nil { + return err + } + r.Push(pack.SequenceNumber, &pack) } - return packet.Unmarshal(r.buf[:n]) } type RTPTCPReader struct { diff --git a/plugin/rtp/pkg/reorder.go b/plugin/rtp/pkg/reorder.go new file mode 100644 index 0000000..58859c7 --- /dev/null +++ b/plugin/rtp/pkg/reorder.go @@ -0,0 +1,77 @@ +package rtp + +var RTPReorderBufferLen uint16 = 50 + +// RTPReorder RTP包乱序重排 +type RTPReorder[T comparable] struct { + lastSeq uint16 //最新收到的rtp包序号 + queue []T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空 + Total uint32 // 总共收到的包数量 + Drop uint32 // 丢弃的包数量 +} + +func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) { + p.Total++ + // 初始化 + if len(p.queue) == 0 { + p.lastSeq = seq + p.queue = make([]T, RTPReorderBufferLen) + return v + } + if seq < p.lastSeq && p.lastSeq-seq < 0x8000 { + // 旧的包直接丢弃 + p.Drop++ + return + } + delta := seq - p.lastSeq + if delta == 0 { + // 重复包 + p.Drop++ + return + } + if delta == 1 { + // 正常顺序,无需缓存 + p.lastSeq = seq + p.pop() + return v + } + if RTPReorderBufferLen < delta { + //超过缓存最大范围,无法挽回,只能造成丢包(序号断裂) + for { + p.lastSeq++ + delta-- + head := p.pop() + // 可以放得进去了 + if delta == RTPReorderBufferLen { + p.queue[RTPReorderBufferLen-1] = v + p.queue[0] = result + return head + } else if head != result { + p.Drop++ + } + } + } else { + // 出现后面的包先到达,缓存起来 + p.queue[delta-1] = v + return + } +} + +func (p *RTPReorder[T]) pop() (result T) { + copy(p.queue, p.queue[1:]) //整体数据向前移动一位,保持0号元素代表lastSeq+1 + p.queue[RTPReorderBufferLen-1] = result + return p.queue[0] +} + +// Pop 从缓存中取出一个包,需要连续调用直到返回nil +func (p *RTPReorder[T]) Pop() (result T) { + if len(p.queue) == 0 { + return + } + if next := p.queue[0]; next != result { + result = next + p.lastSeq++ + p.pop() + } + return +}